Skip to content

Commit 2132cb3

Browse files
authored
GH-39270: [C++] Avoid creating memory manager instance for every buffer view/copy (#39271)
### Rationale for this change We can use `arrow::default_cpu_memory_manager()` for `default_memory_pool()`. ### What changes are included in this PR? Check the given `pool` and use `arrow::default_cpu_memory_manager()` if it's `arrow::default_memory_pool()`. This also caches `arrow::CPUDevice::memory_manager()` result to avoid calling it multiple times. Note that we can avoid creating needless memory manager instance without this. This just avoid calling it multiple times. ### Are these changes tested? Yes. ### Are there any user-facing changes? No. * Closes: #39270 Authored-by: Sutou Kouhei <kou@clear-code.com> Signed-off-by: Sutou Kouhei <kou@clear-code.com>
1 parent 30c4e15 commit 2132cb3

2 files changed

Lines changed: 14 additions & 12 deletions

File tree

cpp/src/arrow/device.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,11 @@ bool CPUDevice::Equals(const Device& other) const {
241241
}
242242

243243
std::shared_ptr<MemoryManager> CPUDevice::memory_manager(MemoryPool* pool) {
244-
return CPUMemoryManager::Make(Instance(), pool);
244+
if (pool == default_memory_pool()) {
245+
return default_cpu_memory_manager();
246+
} else {
247+
return CPUMemoryManager::Make(Instance(), pool);
248+
}
245249
}
246250

247251
std::shared_ptr<MemoryManager> CPUDevice::default_memory_manager() {

cpp/src/arrow/ipc/message.cc

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,7 @@ class MessageDecoder::MessageDecoderImpl {
607607
MemoryPool* pool, bool skip_body)
608608
: listener_(std::move(listener)),
609609
pool_(pool),
610+
memory_manager_(CPUDevice::memory_manager(pool_)),
610611
state_(initial_state),
611612
next_required_size_(initial_next_required_size),
612613
chunks_(),
@@ -822,8 +823,7 @@ class MessageDecoder::MessageDecoderImpl {
822823
if (buffer->is_cpu()) {
823824
metadata_ = buffer;
824825
} else {
825-
ARROW_ASSIGN_OR_RAISE(metadata_,
826-
Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
826+
ARROW_ASSIGN_OR_RAISE(metadata_, Buffer::ViewOrCopy(buffer, memory_manager_));
827827
}
828828
return ConsumeMetadata();
829829
}
@@ -834,16 +834,15 @@ class MessageDecoder::MessageDecoderImpl {
834834
if (chunks_[0]->is_cpu()) {
835835
metadata_ = std::move(chunks_[0]);
836836
} else {
837-
ARROW_ASSIGN_OR_RAISE(
838-
metadata_,
839-
Buffer::ViewOrCopy(chunks_[0], CPUDevice::memory_manager(pool_)));
837+
ARROW_ASSIGN_OR_RAISE(metadata_,
838+
Buffer::ViewOrCopy(chunks_[0], memory_manager_));
840839
}
841840
chunks_.erase(chunks_.begin());
842841
} else {
843842
metadata_ = SliceBuffer(chunks_[0], 0, next_required_size_);
844843
if (!chunks_[0]->is_cpu()) {
845-
ARROW_ASSIGN_OR_RAISE(
846-
metadata_, Buffer::ViewOrCopy(metadata_, CPUDevice::memory_manager(pool_)));
844+
ARROW_ASSIGN_OR_RAISE(metadata_,
845+
Buffer::ViewOrCopy(metadata_, memory_manager_));
847846
}
848847
chunks_[0] = SliceBuffer(chunks_[0], next_required_size_);
849848
}
@@ -911,8 +910,7 @@ class MessageDecoder::MessageDecoderImpl {
911910
if (buffer->is_cpu()) {
912911
return util::SafeLoadAs<int32_t>(buffer->data());
913912
} else {
914-
ARROW_ASSIGN_OR_RAISE(auto cpu_buffer,
915-
Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
913+
ARROW_ASSIGN_OR_RAISE(auto cpu_buffer, Buffer::ViewOrCopy(buffer, memory_manager_));
916914
return util::SafeLoadAs<int32_t>(cpu_buffer->data());
917915
}
918916
}
@@ -924,8 +922,7 @@ class MessageDecoder::MessageDecoderImpl {
924922
std::shared_ptr<Buffer> last_chunk;
925923
for (auto& chunk : chunks_) {
926924
if (!chunk->is_cpu()) {
927-
ARROW_ASSIGN_OR_RAISE(
928-
chunk, Buffer::ViewOrCopy(chunk, CPUDevice::memory_manager(pool_)));
925+
ARROW_ASSIGN_OR_RAISE(chunk, Buffer::ViewOrCopy(chunk, memory_manager_));
929926
}
930927
auto data = chunk->data();
931928
auto data_size = chunk->size();
@@ -951,6 +948,7 @@ class MessageDecoder::MessageDecoderImpl {
951948

952949
std::shared_ptr<MessageDecoderListener> listener_;
953950
MemoryPool* pool_;
951+
std::shared_ptr<MemoryManager> memory_manager_;
954952
State state_;
955953
int64_t next_required_size_;
956954
std::vector<std::shared_ptr<Buffer>> chunks_;

0 commit comments

Comments
 (0)