Skip to content

Commit aae2557

Browse files
mapleFUpitrou
andauthored
GH-39377: [C++] IO: Reuse same buffer in CompressedInputStream (#39807)
### Rationale for this change This patch reuses the same buffer in `CompressedInputStream`. It includes the `decompress_` and `compress_` buffer ### What changes are included in this PR? 1. For `compress_`, allocate and reuse same buffer with `kChunkSize` (64KB), and reusing it 2. For `decompress_`, reusing a same buffer (mostly 1MB) without continues `Reallocate` In the worst case, `decompress_` might hold a large buffer. ### Are these changes tested? Already ### Are there any user-facing changes? `CompressedInputStream` might has larger buffer * Closes: #39377 Lead-authored-by: mwish <maplewish117@gmail.com> Co-authored-by: Antoine Pitrou <antoine@python.org> Co-authored-by: mwish <anmmscs_maple@qq.com> Signed-off-by: Antoine Pitrou <antoine@python.org>
1 parent 24feab0 commit aae2557

4 files changed

Lines changed: 253 additions & 18 deletions

File tree

cpp/src/arrow/io/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,7 @@ if(NOT (${ARROW_SIMD_LEVEL} STREQUAL "NONE") AND NOT (${ARROW_SIMD_LEVEL} STREQU
4343
add_arrow_benchmark(memory_benchmark PREFIX "arrow-io")
4444
endif()
4545

46+
add_arrow_benchmark(compressed_benchmark PREFIX "arrow-io")
47+
4648
# Headers: top level
4749
arrow_install_all_headers("arrow/io")

cpp/src/arrow/io/compressed.cc

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ Result<std::shared_ptr<CompressedOutputStream>> CompressedOutputStream::Make(
201201
util::Codec* codec, const std::shared_ptr<OutputStream>& raw, MemoryPool* pool) {
202202
// CAUTION: codec is not owned
203203
std::shared_ptr<CompressedOutputStream> res(new CompressedOutputStream);
204-
res->impl_.reset(new Impl(pool, std::move(raw)));
204+
res->impl_.reset(new Impl(pool, raw));
205205
RETURN_NOT_OK(res->impl_->Init(codec));
206206
return res;
207207
}
@@ -233,8 +233,10 @@ class CompressedInputStream::Impl {
233233
: pool_(pool),
234234
raw_(raw),
235235
is_open_(true),
236+
supports_zero_copy_from_raw_(raw_->supports_zero_copy()),
236237
compressed_pos_(0),
237238
decompressed_pos_(0),
239+
fresh_decompressor_(false),
238240
total_pos_(0) {}
239241

240242
Status Init(Codec* codec) {
@@ -261,16 +263,35 @@ class CompressedInputStream::Impl {
261263
}
262264
}
263265

264-
bool closed() { return !is_open_; }
266+
bool closed() const { return !is_open_; }
265267

266268
Result<int64_t> Tell() const { return total_pos_; }
267269

268270
// Read compressed data if necessary
269271
Status EnsureCompressedData() {
270272
int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0;
271273
if (compressed_avail == 0) {
272-
// No compressed data available, read a full chunk
273-
ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
274+
// Ensure compressed_ buffer is allocated with kChunkSize.
275+
if (!supports_zero_copy_from_raw_) {
276+
if (compressed_for_non_zero_copy_ == nullptr) {
277+
ARROW_ASSIGN_OR_RAISE(compressed_for_non_zero_copy_,
278+
AllocateResizableBuffer(kChunkSize, pool_));
279+
} else if (compressed_for_non_zero_copy_->size() != kChunkSize) {
280+
RETURN_NOT_OK(
281+
compressed_for_non_zero_copy_->Resize(kChunkSize, /*shrink_to_fit=*/false));
282+
}
283+
ARROW_ASSIGN_OR_RAISE(
284+
int64_t read_size,
285+
raw_->Read(kChunkSize,
286+
compressed_for_non_zero_copy_->mutable_data_as<void>()));
287+
if (read_size != compressed_for_non_zero_copy_->size()) {
288+
RETURN_NOT_OK(
289+
compressed_for_non_zero_copy_->Resize(read_size, /*shrink_to_fit=*/false));
290+
}
291+
compressed_ = compressed_for_non_zero_copy_;
292+
} else {
293+
ARROW_ASSIGN_OR_RAISE(compressed_, raw_->Read(kChunkSize));
294+
}
274295
compressed_pos_ = 0;
275296
}
276297
return Status::OK();
@@ -284,8 +305,13 @@ class CompressedInputStream::Impl {
284305
int64_t decompress_size = kDecompressSize;
285306

286307
while (true) {
287-
ARROW_ASSIGN_OR_RAISE(decompressed_,
288-
AllocateResizableBuffer(decompress_size, pool_));
308+
if (decompressed_ == nullptr) {
309+
ARROW_ASSIGN_OR_RAISE(decompressed_,
310+
AllocateResizableBuffer(decompress_size, pool_));
311+
} else {
312+
// Shrinking the buffer if it's already large enough
313+
RETURN_NOT_OK(decompressed_->Resize(decompress_size, /*shrink_to_fit=*/true));
314+
}
289315
decompressed_pos_ = 0;
290316

291317
int64_t input_len = compressed_->size() - compressed_pos_;
@@ -300,7 +326,9 @@ class CompressedInputStream::Impl {
300326
fresh_decompressor_ = false;
301327
}
302328
if (result.bytes_written > 0 || !result.need_more_output || input_len == 0) {
303-
RETURN_NOT_OK(decompressed_->Resize(result.bytes_written));
329+
// Not calling shrink_to_fit here because we're likely to reusing the buffer.
330+
RETURN_NOT_OK(
331+
decompressed_->Resize(result.bytes_written, /*shrink_to_fit=*/false));
304332
break;
305333
}
306334
DCHECK_EQ(result.bytes_written, 0);
@@ -310,19 +338,14 @@ class CompressedInputStream::Impl {
310338
return Status::OK();
311339
}
312340

313-
// Read a given number of bytes from the decompressed_ buffer.
341+
// Copying a given number of bytes from the decompressed_ buffer.
314342
int64_t ReadFromDecompressed(int64_t nbytes, uint8_t* out) {
315343
int64_t readable = decompressed_ ? (decompressed_->size() - decompressed_pos_) : 0;
316344
int64_t read_bytes = std::min(readable, nbytes);
317345

318346
if (read_bytes > 0) {
319347
memcpy(out, decompressed_->data() + decompressed_pos_, read_bytes);
320348
decompressed_pos_ += read_bytes;
321-
322-
if (decompressed_pos_ == decompressed_->size()) {
323-
// Decompressed data is exhausted, release buffer
324-
decompressed_.reset();
325-
}
326349
}
327350

328351
return read_bytes;
@@ -357,7 +380,7 @@ class CompressedInputStream::Impl {
357380
}
358381

359382
Result<int64_t> Read(int64_t nbytes, void* out) {
360-
auto out_data = reinterpret_cast<uint8_t*>(out);
383+
auto* out_data = reinterpret_cast<uint8_t*>(out);
361384

362385
int64_t total_read = 0;
363386
bool decompressor_has_data = true;
@@ -382,10 +405,10 @@ class CompressedInputStream::Impl {
382405
ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_));
383406
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data()));
384407
RETURN_NOT_OK(buf->Resize(bytes_read));
385-
return std::move(buf);
408+
return buf;
386409
}
387410

388-
std::shared_ptr<InputStream> raw() const { return raw_; }
411+
const std::shared_ptr<InputStream>& raw() const { return raw_; }
389412

390413
private:
391414
// Read 64 KB compressed data at a time
@@ -396,7 +419,12 @@ class CompressedInputStream::Impl {
396419
MemoryPool* pool_;
397420
std::shared_ptr<InputStream> raw_;
398421
bool is_open_;
422+
const bool supports_zero_copy_from_raw_;
399423
std::shared_ptr<Decompressor> decompressor_;
424+
// If `raw_->supports_zero_copy()`, this buffer would not allocate memory.
425+
// Otherwise, this buffer would allocate `kChunkSize` memory and read data from
426+
// `raw_`.
427+
std::shared_ptr<ResizableBuffer> compressed_for_non_zero_copy_;
400428
std::shared_ptr<Buffer> compressed_;
401429
// Position in compressed buffer
402430
int64_t compressed_pos_;
@@ -413,10 +441,9 @@ Result<std::shared_ptr<CompressedInputStream>> CompressedInputStream::Make(
413441
Codec* codec, const std::shared_ptr<InputStream>& raw, MemoryPool* pool) {
414442
// CAUTION: codec is not owned
415443
std::shared_ptr<CompressedInputStream> res(new CompressedInputStream);
416-
res->impl_.reset(new Impl(pool, std::move(raw)));
444+
res->impl_.reset(new Impl(pool, raw));
417445
RETURN_NOT_OK(res->impl_->Init(codec));
418446
return res;
419-
return Status::OK();
420447
}
421448

422449
CompressedInputStream::~CompressedInputStream() { internal::CloseFromDestructor(this); }

cpp/src/arrow/io/compressed.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ class ARROW_EXPORT CompressedOutputStream : public OutputStream {
4444
~CompressedOutputStream() override;
4545

4646
/// \brief Create a compressed output stream wrapping the given output stream.
47+
///
48+
/// The codec must be capable of streaming compression. Some codecs,
49+
/// like Snappy, are not able to do so.
4750
static Result<std::shared_ptr<CompressedOutputStream>> Make(
4851
util::Codec* codec, const std::shared_ptr<OutputStream>& raw,
4952
MemoryPool* pool = default_memory_pool());
@@ -82,6 +85,9 @@ class ARROW_EXPORT CompressedInputStream
8285
~CompressedInputStream() override;
8386

8487
/// \brief Create a compressed input stream wrapping the given input stream.
88+
///
89+
/// The codec must be capable of streaming decompression. Some codecs,
90+
/// like Snappy, are not able to do so.
8591
static Result<std::shared_ptr<CompressedInputStream>> Make(
8692
util::Codec* codec, const std::shared_ptr<InputStream>& raw,
8793
MemoryPool* pool = default_memory_pool());
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "benchmark/benchmark.h"
19+
20+
#include <algorithm>
21+
#include <cstdint>
22+
#include <cstring>
23+
#include <memory>
24+
#include <random>
25+
#include <string>
26+
#include <vector>
27+
28+
#include "arrow/buffer.h"
29+
#include "arrow/io/compressed.h"
30+
#include "arrow/io/memory.h"
31+
#include "arrow/result.h"
32+
#include "arrow/testing/gtest_util.h"
33+
#include "arrow/util/compression.h"
34+
#include "arrow/util/config.h"
35+
#include "arrow/util/logging.h"
36+
#include "arrow/util/macros.h"
37+
38+
namespace arrow::io {
39+
40+
using ::arrow::Compression;
41+
42+
std::vector<uint8_t> MakeCompressibleData(int data_size) {
43+
// XXX This isn't a real-world corpus so doesn't really represent the
44+
// comparative qualities of the algorithms
45+
46+
// First make highly compressible data
47+
std::string base_data =
48+
"Apache Arrow is a cross-language development platform for in-memory data";
49+
int nrepeats = static_cast<int>(1 + data_size / base_data.size());
50+
51+
std::vector<uint8_t> data(base_data.size() * nrepeats);
52+
for (int i = 0; i < nrepeats; ++i) {
53+
std::memcpy(data.data() + i * base_data.size(), base_data.data(), base_data.size());
54+
}
55+
data.resize(data_size);
56+
57+
// Then randomly mutate some bytes so as to make things harder
58+
std::mt19937 engine(42);
59+
std::exponential_distribution<> offsets(0.05);
60+
std::uniform_int_distribution<> values(0, 255);
61+
62+
int64_t pos = 0;
63+
while (pos < data_size) {
64+
data[pos] = static_cast<uint8_t>(values(engine));
65+
pos += static_cast<int64_t>(offsets(engine));
66+
}
67+
68+
return data;
69+
}
70+
71+
// Using a non-zero copy buffer reader to benchmark the non-zero copy path.
72+
class NonZeroCopyBufferReader final : public InputStream {
73+
public:
74+
NonZeroCopyBufferReader(std::shared_ptr<Buffer> buffer) : reader_(std::move(buffer)) {}
75+
76+
bool supports_zero_copy() const override { return false; }
77+
78+
Result<int64_t> Read(int64_t nbytes, void* out) override {
79+
return reader_.Read(nbytes, out);
80+
}
81+
82+
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
83+
// Testing the non-zero copy path like reading from local file or Object store,
84+
// so we need to allocate a buffer and copy the data.
85+
ARROW_ASSIGN_OR_RAISE(auto buf, ::arrow::AllocateResizableBuffer(nbytes));
86+
ARROW_ASSIGN_OR_RAISE(int64_t size, Read(nbytes, buf->mutable_data()));
87+
ARROW_RETURN_NOT_OK(buf->Resize(size));
88+
return buf;
89+
}
90+
Status Close() override { return reader_.Close(); }
91+
Result<int64_t> Tell() const override { return reader_.Tell(); }
92+
bool closed() const override { return reader_.closed(); }
93+
94+
private:
95+
::arrow::io::BufferReader reader_;
96+
};
97+
98+
enum class BufferReadMode { ProvidedByCaller, ReturnedByCallee };
99+
100+
template <typename BufReader, BufferReadMode Mode>
101+
static void CompressedInputStreamBenchmark(::benchmark::State& state,
102+
Compression::type compression) {
103+
const int64_t input_size = state.range(0);
104+
const int64_t batch_size = state.range(1);
105+
106+
const std::vector<uint8_t> data = MakeCompressibleData(static_cast<int>(input_size));
107+
auto codec = ::arrow::util::Codec::Create(compression).ValueOrDie();
108+
int64_t max_compress_len =
109+
codec->MaxCompressedLen(static_cast<int64_t>(data.size()), data.data());
110+
std::shared_ptr<::arrow::ResizableBuffer> buf =
111+
::arrow::AllocateResizableBuffer(max_compress_len).ValueOrDie();
112+
const int64_t compressed_length =
113+
codec
114+
->Compress(static_cast<int64_t>(data.size()), data.data(), max_compress_len,
115+
buf->mutable_data())
116+
.ValueOrDie();
117+
ABORT_NOT_OK(buf->Resize(compressed_length));
118+
for (auto _ : state) {
119+
state.PauseTiming();
120+
auto reader = std::make_shared<BufReader>(buf);
121+
[[maybe_unused]] std::unique_ptr<Buffer> read_buffer;
122+
if constexpr (Mode == BufferReadMode::ProvidedByCaller) {
123+
read_buffer = ::arrow::AllocateBuffer(batch_size).ValueOrDie();
124+
}
125+
state.ResumeTiming();
126+
// Put `CompressedInputStream::Make` in timing.
127+
auto input_stream =
128+
::arrow::io::CompressedInputStream::Make(codec.get(), reader).ValueOrDie();
129+
auto remaining_size = input_size;
130+
while (remaining_size > 0) {
131+
if constexpr (Mode == BufferReadMode::ProvidedByCaller) {
132+
auto value = input_stream->Read(batch_size, read_buffer->mutable_data());
133+
ABORT_NOT_OK(value);
134+
remaining_size -= value.ValueOrDie();
135+
} else {
136+
auto value = input_stream->Read(batch_size);
137+
ABORT_NOT_OK(value);
138+
remaining_size -= value.ValueOrDie()->size();
139+
}
140+
}
141+
}
142+
state.SetBytesProcessed(input_size * state.iterations());
143+
}
144+
145+
template <Compression::type kCompression>
146+
static void CompressedInputStreamZeroCopyBufferProvidedByCaller(
147+
::benchmark::State& state) {
148+
CompressedInputStreamBenchmark<::arrow::io::BufferReader,
149+
BufferReadMode::ProvidedByCaller>(state, kCompression);
150+
}
151+
152+
template <Compression::type kCompression>
153+
static void CompressedInputStreamNonZeroCopyBufferProvidedByCaller(
154+
::benchmark::State& state) {
155+
CompressedInputStreamBenchmark<NonZeroCopyBufferReader,
156+
BufferReadMode::ProvidedByCaller>(state, kCompression);
157+
}
158+
159+
template <Compression::type kCompression>
160+
static void CompressedInputStreamZeroCopyBufferReturnedByCallee(
161+
::benchmark::State& state) {
162+
CompressedInputStreamBenchmark<::arrow::io::BufferReader,
163+
BufferReadMode::ReturnedByCallee>(state, kCompression);
164+
}
165+
166+
template <Compression::type kCompression>
167+
static void CompressedInputStreamNonZeroCopyBufferReturnedByCallee(
168+
::benchmark::State& state) {
169+
CompressedInputStreamBenchmark<NonZeroCopyBufferReader,
170+
BufferReadMode::ReturnedByCallee>(state, kCompression);
171+
}
172+
173+
static void CompressedInputArguments(::benchmark::internal::Benchmark* b) {
174+
b->ArgNames({"num_bytes", "batch_size"})
175+
->Args({8 * 1024, 8 * 1024})
176+
->Args({64 * 1024, 8 * 1024})
177+
->Args({64 * 1024, 64 * 1024})
178+
->Args({1024 * 1024, 8 * 1024})
179+
->Args({1024 * 1024, 64 * 1024})
180+
->Args({1024 * 1024, 1024 * 1024});
181+
}
182+
183+
#ifdef ARROW_WITH_LZ4
184+
// Benchmark LZ4 because it's lightweight, which makes benchmarking focused on the
185+
// overhead of the compression input stream.
186+
BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferProvidedByCaller,
187+
Compression::LZ4_FRAME)
188+
->Apply(CompressedInputArguments);
189+
BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferProvidedByCaller,
190+
Compression::LZ4_FRAME)
191+
->Apply(CompressedInputArguments);
192+
BENCHMARK_TEMPLATE(CompressedInputStreamZeroCopyBufferReturnedByCallee,
193+
Compression::LZ4_FRAME)
194+
->Apply(CompressedInputArguments);
195+
BENCHMARK_TEMPLATE(CompressedInputStreamNonZeroCopyBufferReturnedByCallee,
196+
Compression::LZ4_FRAME)
197+
->Apply(CompressedInputArguments);
198+
#endif
199+
200+
} // namespace arrow::io

0 commit comments

Comments
 (0)