GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data)#39164
GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data)#39164kou merged 2 commits intoapache:mainfrom
Conversation
a860ad1 to
9a18aec
Compare
cpp/src/arrow/ipc/message.cc
Outdated
There was a problem hiding this comment.
This is just a clean up. This is not related to this fix.
cpp/src/arrow/ipc/message.cc
Outdated
There was a problem hiding this comment.
This is just a clean up. This is not related to this fix.
cpp/src/arrow/ipc/message.cc
Outdated
There was a problem hiding this comment.
This is just a clean up. This is not related to this fix.
cpp/src/arrow/ipc/reader.h
Outdated
There was a problem hiding this comment.
I'm not sure this is a good API...
Should we create a new CopyCollectListener or something instead?
cpp/src/arrow/ipc/reader.cc
Outdated
There was a problem hiding this comment.
Maybe, we should not do this...
There was a problem hiding this comment.
Would ArrayData* be better in this place?
There was a problem hiding this comment.
Good catch!
It will reduce reference count related cost.
I'll do it after we get a consensus with this copy approach is reasonable. (I hope that we have better approach...)
9a18aec to
5f37a6c
Compare
|
@pitrou @felipecrv Do you have any opinion for this approach? |
cpp/src/arrow/ipc/reader.h
Outdated
There was a problem hiding this comment.
I don't understand why this is needed. Can you elaborate?
There was a problem hiding this comment.
Oh, sorry. My comment was outdated: #39164 (comment)
But it doesn't elaborate enough...
StreamDecoder::Consume(const uint8_t* data, int64_t size) (not Consume(Buffer* buffer)) assumes that the given data is alive while Consume() is calling. (The data may be freed after Consume() call is finished.)
So decoded record batches that are passed to Listener are valid while Consume() is calling. But CollectListener keeps decoded record batches after Consume() is finished. If we want to make decoded record batches are valid after Consume() call, we need to copy these record batches.
This is only happen with Consume(const uint8_t* data, int64_t size). This is not happen with Consume(Buffer* buffer) because it refers the given buffer. So I want to avoid copying all decoded record batches. This is why I added copy_record_batch option here.
I hope that this explains why...
There was a problem hiding this comment.
I reverted this change add a create new CopyCollectListener() only for test because I'm not sure whether this API is good or not.
We can defer the API change decision by this because this doesn't change public APi. We can reconsider public API later if needed.
cpp/src/arrow/ipc/read_write_test.cc
Outdated
There was a problem hiding this comment.
Please mention the parameter name
| auto listener = std::make_shared<CollectListener>(true); | |
| auto listener = std::make_shared<CollectListener>(/*xxx=*/ true); |
There was a problem hiding this comment.
Also, why do we set it to true here? Can you add a comment?
cpp/src/arrow/ipc/message.cc
Outdated
There was a problem hiding this comment.
Wouldn't this be solved more elegantly by changing ConsumeMetadataBuffer?
Status ConsumeMetadataBuffer(const std::shared_ptr<Buffer>& buffer) {
- if (buffer->is_cpu()) {
- metadata_ = buffer;
- } else {
ARROW_ASSIGN_OR_RAISE(metadata_,
Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
- }
return ConsumeMetadata();
}There was a problem hiding this comment.
It works but it causes needless copy with Consume(buffer) API.
If we use Buffer::ViewOrCopy() for copying a buffer, we should do this in Consume(data, size):
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index fbcd6f139b..351fa6c6db 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -632,9 +632,10 @@ class MessageDecoder::MessageDecoderImpl {
// the current ConsumeData() call is still valid in the
// next ConsumeData() call. So we need to copy metadata
// here.
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
- AllocateBuffer(next_required_size_, pool_));
- memcpy(buffer->mutable_data(), data, next_required_size_);
+ ARROW_ASSIGN_OR_RAISE(
+ auto buffer,
+ Buffer::ViewOrCopy(std::make_shared<Buffer>(data, next_required_size_),
+ CPUDevice::memory_manager(pool_)));
RETURN_NOT_OK(ConsumeMetadataBuffer(buffer));
} break;
case State::BODY: {BTW, we should use arrow::default_cpu_memory_manager() for arrow::default_memory_pool()... I'll open a issue for it later.
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index fbcd6f139b..a0005a0e59 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -607,6 +607,7 @@ class MessageDecoder::MessageDecoderImpl {
MemoryPool* pool, bool skip_body)
: listener_(std::move(listener)),
pool_(pool),
+ memory_manager_(pool_ == default_memory_pool() ? default_cpu_memory_manager() : CPUDevice::memory_manager(pool_)),
state_(initial_state),
next_required_size_(initial_next_required_size),
chunks_(),
@@ -823,7 +824,7 @@ class MessageDecoder::MessageDecoderImpl {
metadata_ = buffer;
} else {
ARROW_ASSIGN_OR_RAISE(metadata_,
- Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
+ Buffer::ViewOrCopy(buffer, memory_manager_));
}
return ConsumeMetadata();
}
@@ -836,14 +837,14 @@ class MessageDecoder::MessageDecoderImpl {
} else {
ARROW_ASSIGN_OR_RAISE(
metadata_,
- Buffer::ViewOrCopy(chunks_[0], CPUDevice::memory_manager(pool_)));
+ Buffer::ViewOrCopy(chunks_[0], memory_manager_));
}
chunks_.erase(chunks_.begin());
} else {
metadata_ = SliceBuffer(chunks_[0], 0, next_required_size_);
if (!chunks_[0]->is_cpu()) {
ARROW_ASSIGN_OR_RAISE(
- metadata_, Buffer::ViewOrCopy(metadata_, CPUDevice::memory_manager(pool_)));
+ metadata_, Buffer::ViewOrCopy(metadata_, memory_manager_));
}
chunks_[0] = SliceBuffer(chunks_[0], next_required_size_);
}
@@ -912,7 +913,7 @@ class MessageDecoder::MessageDecoderImpl {
return util::SafeLoadAs<int32_t>(buffer->data());
} else {
ARROW_ASSIGN_OR_RAISE(auto cpu_buffer,
- Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
+ Buffer::ViewOrCopy(buffer, memory_manager_));
return util::SafeLoadAs<int32_t>(cpu_buffer->data());
}
}
@@ -925,7 +926,7 @@ class MessageDecoder::MessageDecoderImpl {
for (auto& chunk : chunks_) {
if (!chunk->is_cpu()) {
ARROW_ASSIGN_OR_RAISE(
- chunk, Buffer::ViewOrCopy(chunk, CPUDevice::memory_manager(pool_)));
+ chunk, Buffer::ViewOrCopy(chunk, memory_manager_));
}
auto data = chunk->data();
auto data_size = chunk->size();
@@ -951,6 +952,7 @@ class MessageDecoder::MessageDecoderImpl {
std::shared_ptr<MessageDecoderListener> listener_;
MemoryPool* pool_;
+ std::shared_ptr<MemoryManager> memory_manager_;
State state_;
int64_t next_required_size_;
std::vector<std::shared_ptr<Buffer>> chunks_;There was a problem hiding this comment.
It works but it causes needless copy with Consume(buffer) API.
Wouldn't it be the same number of copies?
There was a problem hiding this comment.
If we use the change, all buffers are copied because we're using CPUDevice::memory_manager(pool_) not arrow::default_cpu_memory_manager().
If we don't use the change, all buffers aren't copied.
If we use the change with #39270, all buffers aren't copied.
There was a problem hiding this comment.
Would it work if ConsumeMetadataBuffer took a memory manager parameter?
(Keep in mind that I'm not familiar with this code and don't understand how it's used)
There was a problem hiding this comment.
If we use the change, all buffers are copied because we're using
CPUDevice::memory_manager(pool_)notarrow::default_cpu_memory_manager().
This is not supposed to trigger a copy.
There was a problem hiding this comment.
Sorry. I was wrong.
But we can't use Buffer::ViewOrCopy() for this because Buffer::ViewOrCopy() doesn't copy for this case. It returns a view for this case. So we need to copy the given data by AllocateBuffer()+memcpy() or std::make_shared<Buffer>()+Buffer::Copy() explicitly. I think the former is better because the latter needs to create 2 Buffers but the former needs to create only 1 Buffer.
|
|
|
@kou It would be very nice to have this merged in time for the 15.0.0 release if possible (code freeze planned 8 January) |
|
Sorry. I forgot to reply this. I'll do it today. |
…(data) We need to copy data for metadata message. Because it may be used in subsequent `Consume(data)` calls. We can't assume that the given `data` is still valid in subsequent `Consume(data)`. We also need to copy buffered `data` because it's used in subsequent `Consume(data)` calls.
Add CopyCollectListener() only for test because I'm not sure whether the API is good or not.
|
I've replied:
I've reverted |
|
I'll merge this for 15.0.0. |
|
After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit 6ab7a18. There was 1 benchmark result with an error:
There was 1 benchmark result indicating a performance regression:
The full Conbench report has more details. It also includes information about 10 possible false positives for unstable benchmarks that are known to sometimes produce them. |
…(data) (apache#39164) ### Rationale for this change We need to copy data for metadata message. Because it may be used in subsequent `Consume(data)` calls. We can't assume that the given `data` is still valid in subsequent `Consume(data)`. We also need to copy buffered `data` because it's used in subsequent `Consume(data)` calls. ### What changes are included in this PR? * Add missing copies. * Clean up existing buffer copy codes. * Change tests to use ephemeral `data` to detect this case. * Add `copy_record_batch` option to `CollectListener` to copy decoded record batches. ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes. * Closes apache#39163 * Closes: apache#39163 Authored-by: Sutou Kouhei <kou@clear-code.com> Signed-off-by: Sutou Kouhei <kou@clear-code.com>
Rationale for this change
We need to copy data for metadata message. Because it may be used in subsequent
Consume(data)calls. We can't assume that the givendatais still valid in subsequentConsume(data).We also need to copy buffered
databecause it's used in subsequentConsume(data)calls.What changes are included in this PR?
datato detect this case.copy_record_batchoption toCollectListenerto copy decoded record batches.Are these changes tested?
Yes.
Are there any user-facing changes?
Yes.
arrow::ipc::StreamDecodermay return a broken RecordBatch withConsume(const uint8_t* data, int64_t size)API #39163