Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Future;

namespace util {
class Codec;
class CodecOptions;
} // namespace util

class Buffer;
Expand Down
24 changes: 19 additions & 5 deletions cpp/src/arrow/util/compression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Result<int> Codec::DefaultCompressionLevel(Compression::type codec_type) {
}

Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
int compression_level) {
const CodecOptions& codec_options) {
if (!IsAvailable(codec_type)) {
if (codec_type == Compression::LZO) {
return Status::NotImplemented("LZO codec not implemented");
Expand All @@ -151,6 +151,7 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
"' not built");
}

auto compression_level = codec_options.compression_level;
if (compression_level != kUseDefaultCompressionLevel &&
!SupportsCompressionLevel(codec_type)) {
return Status::Invalid("Codec '", GetCodecAsString(codec_type),
Expand All @@ -166,16 +167,23 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
codec = internal::MakeSnappyCodec();
#endif
break;
case Compression::GZIP:
case Compression::GZIP: {
#ifdef ARROW_WITH_ZLIB
codec = internal::MakeGZipCodec(compression_level);
auto opt = dynamic_cast<const GZipCodecOptions*>(&codec_options);
codec = internal::MakeGZipCodec(compression_level,
opt ? opt->gzip_format : GZipFormat::GZIP,
opt ? opt->window_bits : std::nullopt);
#endif
break;
case Compression::BROTLI:
}
case Compression::BROTLI: {
#ifdef ARROW_WITH_BROTLI
codec = internal::MakeBrotliCodec(compression_level);
auto opt = dynamic_cast<const BrotliCodecOptions*>(&codec_options);
codec = internal::MakeBrotliCodec(compression_level,
opt ? opt->window_bits : std::nullopt);
#endif
break;
}
case Compression::LZ4:
#ifdef ARROW_WITH_LZ4
codec = internal::MakeLz4RawCodec(compression_level);
Expand Down Expand Up @@ -210,6 +218,12 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
return std::move(codec);
}

// use compression level to create Codec
Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
Comment thread
pitrou marked this conversation as resolved.
Outdated
int compression_level) {
return Codec::Create(codec_type, CodecOptions{compression_level});
}

bool Codec::IsAvailable(Compression::type codec_type) {
switch (codec_type) {
case Compression::UNCOMPRESSED:
Expand Down
43 changes: 41 additions & 2 deletions cpp/src/arrow/util/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <limits>
#include <memory>
#include <optional>
#include <string>

#include "arrow/result.h"
Expand Down Expand Up @@ -107,6 +108,40 @@ class ARROW_EXPORT Decompressor {
// XXX add methods for buffer size heuristics?
};

/// \brief Compression codec options
class ARROW_EXPORT CodecOptions {
public:
explicit CodecOptions(int compression_level = kUseDefaultCompressionLevel)
: compression_level(compression_level) {}

virtual ~CodecOptions() = default;

int compression_level;
};
Comment thread
pitrou marked this conversation as resolved.
Outdated

// ----------------------------------------------------------------------
// GZip codec options implementation

enum class GZipFormat {
ZLIB,
DEFLATE,
GZIP,
};

class ARROW_EXPORT GZipCodecOptions : public CodecOptions {
public:
GZipFormat gzip_format = GZipFormat::GZIP;
std::optional<int> window_bits;
};

// ----------------------------------------------------------------------
// brotli codec options implementation

class ARROW_EXPORT BrotliCodecOptions : public CodecOptions {
public:
std::optional<int> window_bits;
};

/// \brief Compression codec
class ARROW_EXPORT Codec {
public:
Expand All @@ -122,9 +157,13 @@ class ARROW_EXPORT Codec {
/// \brief Return compression type for name (all lower case)
static Result<Compression::type> GetCompressionType(const std::string& name);

/// \brief Create a codec for the given compression algorithm
/// \brief Create a codec for the given compression algorithm with CodecOptions
static Result<std::unique_ptr<Codec>> Create(
Compression::type codec, int compression_level = kUseDefaultCompressionLevel);
Compression::type codec, const CodecOptions& codec_options = CodecOptions{});

/// \brief Create a codec for the given compression algorithm
static Result<std::unique_ptr<Codec>> Create(Compression::type codec,
int compression_level);

/// \brief Return true if support for indicated codec has been enabled
static bool IsAvailable(Compression::type codec);
Expand Down
36 changes: 26 additions & 10 deletions cpp/src/arrow/util/compression_brotli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ class BrotliDecompressor : public Decompressor {

class BrotliCompressor : public Compressor {
public:
explicit BrotliCompressor(int compression_level)
: compression_level_(compression_level) {}
explicit BrotliCompressor(int compression_level, int window_bits)
: compression_level_(compression_level), window_bits_(window_bits) {}

~BrotliCompressor() override {
if (state_ != nullptr) {
Expand All @@ -109,6 +109,9 @@ class BrotliCompressor : public Compressor {
if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_QUALITY, compression_level_)) {
return BrotliError("Brotli set compression level failed");
}
if (!BrotliEncoderSetParameter(state_, BROTLI_PARAM_LGWIN, window_bits_)) {
return BrotliError("Brotli set window size failed");
}
return Status::OK();
}

Expand Down Expand Up @@ -166,17 +169,19 @@ class BrotliCompressor : public Compressor {

private:
const int compression_level_;
const int window_bits_;
};

// ----------------------------------------------------------------------
// Brotli codec implementation

class BrotliCodec : public Codec {
public:
explicit BrotliCodec(int compression_level)
explicit BrotliCodec(int compression_level, int window_bits)
: compression_level_(compression_level == kUseDefaultCompressionLevel
? kBrotliDefaultCompressionLevel
: compression_level) {}
: compression_level),
window_bits_(window_bits) {}

Result<int64_t> Decompress(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) override {
Expand All @@ -201,16 +206,16 @@ class BrotliCodec : public Codec {
DCHECK_GE(input_len, 0);
DCHECK_GE(output_buffer_len, 0);
std::size_t output_size = static_cast<size_t>(output_buffer_len);
if (BrotliEncoderCompress(compression_level_, BROTLI_DEFAULT_WINDOW,
BROTLI_DEFAULT_MODE, static_cast<size_t>(input_len), input,
&output_size, output_buffer) == BROTLI_FALSE) {
if (BrotliEncoderCompress(compression_level_, window_bits_, BROTLI_DEFAULT_MODE,
static_cast<size_t>(input_len), input, &output_size,
output_buffer) == BROTLI_FALSE) {
return Status::IOError("Brotli compression failure.");
}
return output_size;
}

Result<std::shared_ptr<Compressor>> MakeCompressor() override {
auto ptr = std::make_shared<BrotliCompressor>(compression_level_);
auto ptr = std::make_shared<BrotliCompressor>(compression_level_, window_bits_);
RETURN_NOT_OK(ptr->Init());
return ptr;
}
Expand All @@ -221,6 +226,14 @@ class BrotliCodec : public Codec {
return ptr;
}

Status Init() override {
if (window_bits_ < BROTLI_MIN_WINDOW_BITS || window_bits_ > BROTLI_MAX_WINDOW_BITS) {
return Status::Invalid("Brotli window_bits should be between ",
BROTLI_MIN_WINDOW_BITS, " and ", BROTLI_MAX_WINDOW_BITS);
}
return Status::OK();
}

Compression::type compression_type() const override { return Compression::BROTLI; }

int compression_level() const override { return compression_level_; }
Expand All @@ -232,12 +245,15 @@ class BrotliCodec : public Codec {

private:
const int compression_level_;
const int window_bits_;
};

} // namespace

std::unique_ptr<Codec> MakeBrotliCodec(int compression_level) {
return std::make_unique<BrotliCodec>(compression_level);
std::unique_ptr<Codec> MakeBrotliCodec(int compression_level,
std::optional<int> window_bits) {
return std::make_unique<BrotliCodec>(compression_level,
window_bits.value_or(BROTLI_DEFAULT_WINDOW));
}

} // namespace internal
Expand Down
15 changes: 5 additions & 10 deletions cpp/src/arrow/util/compression_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,20 @@ constexpr int kBrotliDefaultCompressionLevel = 8;

// Brotli codec.
std::unique_ptr<Codec> MakeBrotliCodec(
int compression_level = kBrotliDefaultCompressionLevel);
int compression_level = kBrotliDefaultCompressionLevel,
std::optional<int> window_bits = std::nullopt);

// BZ2 codec.
constexpr int kBZ2DefaultCompressionLevel = 9;

std::unique_ptr<Codec> MakeBZ2Codec(int compression_level = kBZ2DefaultCompressionLevel);

// GZip
constexpr int kGZipDefaultCompressionLevel = 9;

struct GZipFormat {
enum type {
ZLIB,
DEFLATE,
GZIP,
};
};

std::unique_ptr<Codec> MakeGZipCodec(int compression_level = kGZipDefaultCompressionLevel,
GZipFormat::type format = GZipFormat::GZIP);
GZipFormat format = GZipFormat::GZIP,
std::optional<int> window_bits = std::nullopt);

// Snappy
std::unique_ptr<Codec> MakeSnappyCodec();
Expand Down
76 changes: 74 additions & 2 deletions cpp/src/arrow/util/compression_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,81 @@ TEST(TestCodecMisc, SpecifyCompressionLevel) {
continue;
}
const auto level = combination.level;
const auto codec_options = arrow::util::CodecOptions(level);
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, level);
auto result2 = Codec::Create(compression, level);
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
CheckCodecRoundtrip(*result1, *result2, data);
}
}
}

TEST(TestCodecMisc, SpecifyCodecOptionsGZip) {
// for now only GZIP & Brotli codec options supported, since it has specific parameters
// to be customized, other codecs could directly go with CodecOptions, could add more
// specific codec options if needed.
struct CombinationOption {
int level;
GZipFormat format;
int window_bits;
bool expect_success;
};
constexpr CombinationOption combinations[] = {{2, GZipFormat::ZLIB, 12, true},
{9, GZipFormat::GZIP, 9, true},
{9, GZipFormat::GZIP, 20, false},
{5, GZipFormat::DEFLATE, -12, false},
{-992, GZipFormat::GZIP, 15, false}};

std::vector<uint8_t> data = MakeRandomData(2000);
for (const auto& combination : combinations) {
const auto compression = Compression::GZIP;
if (!Codec::IsAvailable(compression)) {
// Support for this codec hasn't been built
continue;
}
auto codec_options = arrow::util::GZipCodecOptions();
codec_options.compression_level = combination.level;
codec_options.gzip_format = combination.format;
codec_options.window_bits = combination.window_bits;
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
CheckCodecRoundtrip(*result1, *result2, data);
}
}
}

TEST(TestCodecMisc, SpecifyCodecOptionsBrotli) {
// for now only GZIP & Brotli codec options supported, since it has specific parameters
// to be customized, other codecs could directly go with CodecOptions, could add more
// specific codec options if needed.
struct CombinationOption {
int level;
int window_bits;
bool expect_success;
};
constexpr CombinationOption combinations[] = {
{8, 22, true}, {11, 10, true}, {1, 24, true}, {5, -12, false}, {-992, 25, false}};

std::vector<uint8_t> data = MakeRandomData(2000);
for (const auto& combination : combinations) {
const auto compression = Compression::BROTLI;
if (!Codec::IsAvailable(compression)) {
// Support for this codec hasn't been built
continue;
}
auto codec_options = arrow::util::BrotliCodecOptions();
codec_options.compression_level = combination.level;
codec_options.window_bits = combination.window_bits;
const auto expect_success = combination.expect_success;
auto result1 = Codec::Create(compression, codec_options);
auto result2 = Codec::Create(compression, codec_options);
ASSERT_EQ(expect_success, result1.ok());
ASSERT_EQ(expect_success, result2.ok());
if (expect_success) {
Expand Down
Loading