Skip to content

GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data)#39164

Merged
kou merged 2 commits intoapache:mainfrom
kou:cpp-decoder-data
Jan 6, 2024
Merged

GH-39163: [C++] Add missing data copy in StreamDecoder::Consume(data)#39164
kou merged 2 commits intoapache:mainfrom
kou:cpp-decoder-data

Conversation

@kou
Copy link
Copy Markdown
Member

@kou kou commented Dec 10, 2023

Rationale for this change

We need to copy data for metadata message. Because it may be used in subsequent Consume(data) calls. We can't assume that the given data is still valid in subsequent Consume(data).

We also need to copy buffered data because it's used in subsequent Consume(data) calls.

What changes are included in this PR?

  • Add missing copies.
  • Clean up existing buffer copy codes.
  • Change tests to use ephemeral data to detect this case.
  • Add copy_record_batch option to CollectListener to copy decoded record batches.

Are these changes tested?

Yes.

Are there any user-facing changes?

Yes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a clean up. This is not related to this fix.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a clean up. This is not related to this fix.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a clean up. This is not related to this fix.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is a good API...
Should we create a new CopyCollectListener or something instead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we should not do this...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would ArrayData* be better in this place?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!
It will reduce reference count related cost.
I'll do it after we get a consensus with this copy approach is reasonable. (I hope that we have better approach...)

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Dec 10, 2023
@kou kou force-pushed the cpp-decoder-data branch from 9a18aec to 5f37a6c Compare December 11, 2023 01:39
@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Dec 11, 2023
@kou
Copy link
Copy Markdown
Member Author

kou commented Dec 14, 2023

@pitrou @felipecrv Do you have any opinion for this approach?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this is needed. Can you elaborate?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. My comment was outdated: #39164 (comment)

But it doesn't elaborate enough...

StreamDecoder::Consume(const uint8_t* data, int64_t size) (not Consume(Buffer* buffer)) assumes that the given data is alive while Consume() is calling. (The data may be freed after Consume() call is finished.)
So decoded record batches that are passed to Listener are valid while Consume() is calling. But CollectListener keeps decoded record batches after Consume() is finished. If we want to make decoded record batches are valid after Consume() call, we need to copy these record batches.

This is only happen with Consume(const uint8_t* data, int64_t size). This is not happen with Consume(Buffer* buffer) because it refers the given buffer. So I want to avoid copying all decoded record batches. This is why I added copy_record_batch option here.

I hope that this explains why...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pitrou Does this elaborate enough?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this change add a create new CopyCollectListener() only for test because I'm not sure whether this API is good or not.
We can defer the API change decision by this because this doesn't change public APi. We can reconsider public API later if needed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mention the parameter name

Suggested change
auto listener = std::make_shared<CollectListener>(true);
auto listener = std::make_shared<CollectListener>(/*xxx=*/ true);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why do we set it to true here? Can you add a comment?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be solved more elegantly by changing ConsumeMetadataBuffer?

   Status ConsumeMetadataBuffer(const std::shared_ptr<Buffer>& buffer) {
-    if (buffer->is_cpu()) {
-      metadata_ = buffer;
-    } else {
       ARROW_ASSIGN_OR_RAISE(metadata_,
                             Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
-    }
     return ConsumeMetadata();
   }

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works but it causes needless copy with Consume(buffer) API.

If we use Buffer::ViewOrCopy() for copying a buffer, we should do this in Consume(data, size):

diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index fbcd6f139b..351fa6c6db 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -632,9 +632,10 @@ class MessageDecoder::MessageDecoderImpl {
             // the current ConsumeData() call is still valid in the
             // next ConsumeData() call. So we need to copy metadata
             // here.
-            ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
-                                  AllocateBuffer(next_required_size_, pool_));
-            memcpy(buffer->mutable_data(), data, next_required_size_);
+            ARROW_ASSIGN_OR_RAISE(
+                auto buffer,
+                Buffer::ViewOrCopy(std::make_shared<Buffer>(data, next_required_size_),
+                                   CPUDevice::memory_manager(pool_)));
             RETURN_NOT_OK(ConsumeMetadataBuffer(buffer));
           } break;
           case State::BODY: {

BTW, we should use arrow::default_cpu_memory_manager() for arrow::default_memory_pool()... I'll open a issue for it later.

diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index fbcd6f139b..a0005a0e59 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -607,6 +607,7 @@ class MessageDecoder::MessageDecoderImpl {
                               MemoryPool* pool, bool skip_body)
       : listener_(std::move(listener)),
         pool_(pool),
+        memory_manager_(pool_ == default_memory_pool() ? default_cpu_memory_manager() : CPUDevice::memory_manager(pool_)),
         state_(initial_state),
         next_required_size_(initial_next_required_size),
         chunks_(),
@@ -823,7 +824,7 @@ class MessageDecoder::MessageDecoderImpl {
       metadata_ = buffer;
     } else {
       ARROW_ASSIGN_OR_RAISE(metadata_,
-                            Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
+                            Buffer::ViewOrCopy(buffer, memory_manager_));
     }
     return ConsumeMetadata();
   }
@@ -836,14 +837,14 @@ class MessageDecoder::MessageDecoderImpl {
         } else {
           ARROW_ASSIGN_OR_RAISE(
               metadata_,
-              Buffer::ViewOrCopy(chunks_[0], CPUDevice::memory_manager(pool_)));
+              Buffer::ViewOrCopy(chunks_[0], memory_manager_));
         }
         chunks_.erase(chunks_.begin());
       } else {
         metadata_ = SliceBuffer(chunks_[0], 0, next_required_size_);
         if (!chunks_[0]->is_cpu()) {
           ARROW_ASSIGN_OR_RAISE(
-              metadata_, Buffer::ViewOrCopy(metadata_, CPUDevice::memory_manager(pool_)));
+            metadata_, Buffer::ViewOrCopy(metadata_, memory_manager_));
         }
         chunks_[0] = SliceBuffer(chunks_[0], next_required_size_);
       }
@@ -912,7 +913,7 @@ class MessageDecoder::MessageDecoderImpl {
       return util::SafeLoadAs<int32_t>(buffer->data());
     } else {
       ARROW_ASSIGN_OR_RAISE(auto cpu_buffer,
-                            Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
+                            Buffer::ViewOrCopy(buffer, memory_manager_));
       return util::SafeLoadAs<int32_t>(cpu_buffer->data());
     }
   }
@@ -925,7 +926,7 @@ class MessageDecoder::MessageDecoderImpl {
     for (auto& chunk : chunks_) {
       if (!chunk->is_cpu()) {
         ARROW_ASSIGN_OR_RAISE(
-            chunk, Buffer::ViewOrCopy(chunk, CPUDevice::memory_manager(pool_)));
+            chunk, Buffer::ViewOrCopy(chunk, memory_manager_));
       }
       auto data = chunk->data();
       auto data_size = chunk->size();
@@ -951,6 +952,7 @@ class MessageDecoder::MessageDecoderImpl {
 
   std::shared_ptr<MessageDecoderListener> listener_;
   MemoryPool* pool_;
+  std::shared_ptr<MemoryManager> memory_manager_;
   State state_;
   int64_t next_required_size_;
   std::vector<std::shared_ptr<Buffer>> chunks_;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#39270 for memory manager.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works but it causes needless copy with Consume(buffer) API.

Wouldn't it be the same number of copies?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the change, all buffers are copied because we're using CPUDevice::memory_manager(pool_) not arrow::default_cpu_memory_manager().
If we don't use the change, all buffers aren't copied.

If we use the change with #39270, all buffers aren't copied.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it work if ConsumeMetadataBuffer took a memory manager parameter?

(Keep in mind that I'm not familiar with this code and don't understand how it's used)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the change, all buffers are copied because we're using CPUDevice::memory_manager(pool_) not arrow::default_cpu_memory_manager().

This is not supposed to trigger a copy.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. I was wrong.
But we can't use Buffer::ViewOrCopy() for this because Buffer::ViewOrCopy() doesn't copy for this case. It returns a view for this case. So we need to copy the given data by AllocateBuffer()+memcpy() or std::make_shared<Buffer>()+Buffer::Copy() explicitly. I think the former is better because the latter needs to create 2 Buffers but the former needs to create only 1 Buffer.

@github-actions
Copy link
Copy Markdown

⚠️ GitHub issue #39163 has been automatically assigned in GitHub to PR creator.

@ianmcook
Copy link
Copy Markdown
Member

ianmcook commented Jan 2, 2024

@kou It would be very nice to have this merged in time for the 15.0.0 release if possible (code freeze planned 8 January)

@felipecrv
Copy link
Copy Markdown
Contributor

@kou It would be very nice to have this merged in time for the 15.0.0 release if possible (code freeze planned 8 January)

To be clear: I don't oppose the merge at all. I just don't understand what's going on in this class to judge the solution. cc @ianmcook @pitrou

@kou
Copy link
Copy Markdown
Member Author

kou commented Jan 4, 2024

Sorry. I forgot to reply this. I'll do it today.

…(data)

We need to copy data for metadata message. Because it may be used in
subsequent `Consume(data)` calls. We can't assume that the given
`data` is still valid in subsequent `Consume(data)`.

We also need to copy buffered `data` because it's used in subsequent
`Consume(data)` calls.
@kou kou force-pushed the cpp-decoder-data branch from 5f37a6c to 8947d76 Compare January 5, 2024 07:40
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Jan 5, 2024
Add CopyCollectListener() only for test because I'm not sure whether
the API is good or not.
@kou kou force-pushed the cpp-decoder-data branch from ea46f4f to 719855b Compare January 5, 2024 08:44
@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Jan 5, 2024
@kou
Copy link
Copy Markdown
Member Author

kou commented Jan 5, 2024

I've replied:

I've reverted CollectListener API change. So we don't need to think about public API change in this PR.
If std::make_shared<Buffer>()+Buffer::Copy() is better, please change it directly and merge this for 15.0.0. I don't have strong opinion for the approach.

@kou
Copy link
Copy Markdown
Member Author

kou commented Jan 6, 2024

I'll merge this for 15.0.0.
If there is a problem in this change, I'll work on it in a follow-up PR.

@kou kou merged commit 6ab7a18 into apache:main Jan 6, 2024
@kou kou deleted the cpp-decoder-data branch January 6, 2024 07:57
@kou kou removed the awaiting changes Awaiting changes label Jan 6, 2024
@conbench-apache-arrow
Copy link
Copy Markdown

After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit 6ab7a18.

There was 1 benchmark result with an error:

There was 1 benchmark result indicating a performance regression:

The full Conbench report has more details. It also includes information about 10 possible false positives for unstable benchmarks that are known to sometimes produce them.

dgreiss pushed a commit to dgreiss/arrow that referenced this pull request Feb 19, 2024
…(data) (apache#39164)

### Rationale for this change

We need to copy data for metadata message. Because it may be used in subsequent `Consume(data)` calls. We can't assume that the given `data` is still valid in subsequent `Consume(data)`.

We also need to copy buffered `data` because it's used in subsequent `Consume(data)` calls.

### What changes are included in this PR?

* Add missing copies.
* Clean up existing buffer copy codes.
* Change tests to use ephemeral `data` to detect this case.
* Add `copy_record_batch` option to `CollectListener` to copy decoded record batches.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.

* Closes apache#39163 
* Closes: apache#39163

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[C++] arrow::ipc::StreamDecoder may return a broken RecordBatch with Consume(const uint8_t* data, int64_t size) API

5 participants