Skip to content

[PG] Impelemented support for additional collective primitives: **gather**, **scatter**, and **reduce** in the Mooncake backend and Added unit tests in test_mooncake_backend.py and test_mooncake_backend_cpu to cover new ops#1469

Merged
UNIDY2002 merged 8 commits intokvcache-ai:mainfrom
hhr2449:main
Feb 1, 2026

Conversation

@hhr2449
Copy link
Copy Markdown
Contributor

@hhr2449 hhr2449 commented Jan 31, 2026

Description

Part of #1225

This PR implements support for additional collective primitives: gather, scatter, and reduce in the Mooncake backend

Key Changes:

  1. Support for New Collective Primitives:

    • Implemented gather, scatter, and reduce APIs in MooncakeBackend.
    • Implemented corresponding task submission and execution logic in MooncakeWorker for both CPU and CUDA backends.
    • Replaced cudaMemcpyHostToDevice/DeviceToHost with cudaMemcpyDeviceToDevice for internal GPU buffer operations.
  2. Unit Tests:

    • Updated tests/test_mooncake_backend.py (CUDA) and tests/test_mooncake_backend_cpu.py (CPU) to include test cases for gather, scatter, and reduce.

Type of Change

  • Types
    • Bug fix
    • New feature
      • Transfer Engine
      • Mooncake Store
      • Mooncake EP
      • Integration
      • P2P Store
      • Python Wheel
    • Breaking change
    • CI/CD
    • Documentation update
    • Other

How Has This Been Tested?

I ran the updated unit tests in a GPU environment to verify the correctness of the new primitives, specifically focusing on asymmetric behaviors.

  • Test Command: python3 -m unittest mooncake-wheel.tests.test_mooncake_backend
  • Environment: Server with CUDA support.
  • Results: Passed all tests, including the new test_gather, test_scatter, and test_reduce. Verified that Root correctly aggregates [0, 1, ...] for gather and sums values for reduce.

Checklist

  • I have performed a self-review of my own code.
  • I have formatted my own code using ./scripts/code_format.sh before submitting.
  • I have updated the documentation.
  • I have added tests to prove my changes are effective.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @hhr2449, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces support for the gather, scatter, and reduce collective communication primitives to the Mooncake backend, enhancing its functionality for distributed tensor operations. It also includes an optimization for CUDA memory transfers within existing collectives and expands the types of reduction operations supported. These changes are thoroughly validated by new unit tests covering both CPU and CUDA execution paths.

Highlights

  • New Collective Primitives: Implemented gather, scatter, and reduce operations within the Mooncake backend, significantly extending its distributed communication capabilities for both CPU and CUDA environments.
  • CUDA Memory Optimization: Replaced cudaMemcpyHostToDevice and cudaMemcpyDeviceToHost with cudaMemcpyDeviceToDevice for internal GPU buffer operations in existing collective primitives, potentially improving performance by keeping data on the device.
  • Expanded Reduction Operations: The reduce kernel now supports MAX and PRODUCT operations in addition to SUM and MIN, offering more flexibility for aggregation.
  • Comprehensive Unit Testing: Added new unit tests for gather, scatter, and reduce in both CUDA (test_mooncake_backend.py) and CPU (test_mooncake_backend_cpu.py) environments to ensure correctness and cover asymmetric behaviors.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for gather, scatter, and reduce collective operations in the Mooncake backend for both CPU and CUDA devices, including an optimization for memory copies and unit tests. However, critical logic errors were identified: the broadcastRoot is hardcoded to 0 in all three operations, ignoring the user-specified root rank, which can lead to incorrect data transfers and potential data leakage. Additionally, the reduce operation on the CPU backend utilizes a buggy reduction implementation. There's also an opportunity to improve the new tests by refactoring duplicated code to enhance maintainability. These issues should be addressed to ensure the correctness and security of the collective operations.

Comment thread mooncake-pg/src/mooncake_backend.cpp Outdated
Comment on lines +609 to +623
c10d::OpType::REDUCE, tensorSize, 0, &meta_,
[=](void* dst, size_t pos, size_t realSize) {
memcpy(dst, (char*)tensor.data_ptr() + pos, realSize);
},
[=](void* src, size_t pos, size_t realSize) {
if (isRoot) {
memset((char*)tensor.data_ptr() + pos, 0, realSize);
launchReduceCpu(tensor, pos, realSize, src, numRanks,
opts.reduceOp, meta_.activeRanks);
}
});
} else {
auto stream = at::cuda::getCurrentCUDAStream(tensor.device().index());
return worker_.putTaskCuda(
c10d::OpType::REDUCE, tensorSize, 0, &meta_, stream,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The reduce implementation hardcodes the broadcastRoot parameter to 0 in both CPU and CUDA task submissions (lines 609 and 623). This ignores the rootRank specified in the collective options, which will cause the operation to fail or produce incorrect results if the designated root is not rank 0. Additionally, the CPU implementation (line 616) calls launchReduceCpu, which relies on a buggy reduceCpu implementation that incorrectly initializes the accumulator and handles offsets for active ranks. The broadcastRoot should be set to the calculated root variable.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hhr2449 Please set the broadcastRoot parameter of putTaskCuda to root rather than 0.

Comment thread mooncake-pg/src/mooncake_backend.cpp Outdated
Comment on lines +653 to +670
c10d::OpType::GATHER, tensorSize, 0, &meta_,
[=](void* dst, size_t pos, size_t realSize) {
memcpy(dst, (char*)inputTensor.data_ptr() + pos, realSize);
},
[=](void* src, size_t pos, size_t realSize) {
if (isRoot) {
auto outputTensors_ = outputTensors.back();
for (const auto j : c10::irange(outputTensors_.size())) {
memcpy((char*)outputTensors_[j].data_ptr() + pos,
(char*)src + j * realSize, realSize);
}
}
});
} else {
auto stream =
at::cuda::getCurrentCUDAStream(inputTensor.device().index());
return worker_.putTaskCuda(
c10d::OpType::GATHER, tensorSize, 0, &meta_, stream,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The gather implementation hardcodes the broadcastRoot parameter to 0 in both CPU and CUDA task submissions (lines 653 and 670). This ignores the rootRank specified in the collective options, causing all ranks to send data to rank 0 regardless of the intended destination. This leads to data corruption and potential leakage of stale data from rank 0's buffers. The broadcastRoot should be set to the root variable.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Comment thread mooncake-pg/src/mooncake_backend.cpp Outdated
Comment on lines +703 to +721
c10d::OpType::SCATTER, tensorSize, 0, &meta_,
[=](void* dst, size_t pos, size_t realSize) {
if (isRoot) {
auto inputTensors_ = inputTensors.back();
for (const auto j : c10::irange(inputTensors_.size())) {
memcpy((char*)dst + j * realSize,
(char*)inputTensors_[j].data_ptr() + pos,
realSize);
}
}
},
[=](void* src, size_t pos, size_t realSize) {
memcpy((char*)outputTensor.data_ptr() + pos, src, realSize);
});
} else {
auto stream =
at::cuda::getCurrentCUDAStream(outputTensor.device().index());
return worker_.putTaskCuda(
c10d::OpType::SCATTER, tensorSize, 0, &meta_, stream,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The scatter implementation hardcodes the broadcastRoot parameter to 0 in both CPU and CUDA task submissions (lines 703 and 721). This ignores the rootRank specified in the collective options. As a result, only rank 0 will attempt to perform the scatter transfers, potentially sending its own data instead of the actual root's data. This leads to data corruption and potential leakage of sensitive information from rank 0's buffers. The broadcastRoot should be set to the root variable.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment on lines +124 to +165
def test_gather(self):
# Expected gather (Root) = [0, 1, 2, ..., size-1]
# Expected gather (Others) = None
mp_manager = mp.Manager()
results = mp_manager.dict()
mp.spawn(
worker,
args=(self.world_size, results, "gather"),
nprocs=self.world_size,
join=True,
)
self.assertEqual(results[0], list(range(self.world_size)))
for r in range(1, self.world_size):
self.assertIsNone(results[r])

def test_scatter(self):
# Expected scatter (Rank i) = i
mp_manager = mp.Manager()
results = mp_manager.dict()
mp.spawn(
worker,
args=(self.world_size, results, "scatter"),
nprocs=self.world_size,
join=True,
)
for r in range(self.world_size):
self.assertEqual(results[r], r)

def test_reduce(self):
# Expected reduce (Root) = sum([1, 1, ..., 1]) = size
# Expected reduce (Others) = None
mp_manager = mp.Manager()
results = mp_manager.dict()
mp.spawn(
worker,
args=(self.world_size, results, "reduce"),
nprocs=self.world_size,
join=True,
)
self.assertEqual(results[0], self.world_size)
for r in range(1, self.world_size):
self.assertIsNone(results[r])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant code duplication across test_gather, test_scatter, and test_reduce. To improve maintainability, consider creating a new helper method for these asymmetric collectives, similar to how _spawn_and_check is used for symmetric ones.

For example, you could introduce a helper like _spawn_and_check_asymmetric:

def _spawn_and_check_asymmetric(self, collective, expected_fn):
    mp_manager = mp.Manager()
    results = mp_manager.dict()
    mp.spawn(
        worker,
        args=(self.world_size, results, collective),
        nprocs=self.world_size,
        join=True,
    )
    expected_results = expected_fn(self.world_size)
    self.assertEqual(results.copy(), expected_results)

def test_gather(self):
    def expected_fn(size):
        res = {0: list(range(size))}
        for r in range(1, size):
            res[r] = None
        return res
    self._spawn_and_check_asymmetric("gather", expected_fn)

Comment on lines +103 to +144
def test_gather(self):
# Expected gather (Root) = [0, 1, 2, ..., size-1]
# Expected gather (Others) = None
mp_manager = mp.Manager()
results = mp_manager.dict()
mp.spawn(
worker,
args=(self.world_size, results, "gather"),
nprocs=self.world_size,
join=True
)
self.assertEqual(results[0], list(range(self.world_size)))
for r in range(1, self.world_size):
self.assertIsNone(results[r])

def test_scatter(self):
# Expected scatter (Rank i) = i
mp_manager = mp.Manager()
results = mp_manager.dict()
mp.spawn(
worker,
args=(self.world_size, results, "scatter"),
nprocs=self.world_size,
join=True
)
for r in range(self.world_size):
self.assertEqual(results[r], r)

def test_reduce(self):
# Expected reduce (Root) = sum([1, 1, ..., 1]) = size
# Expected reduce (Others) = None
mp_manager = mp.Manager()
results = mp_manager.dict()
mp.spawn(
worker,
args=(self.world_size, results, "reduce"),
nprocs=self.world_size,
join=True
)
self.assertEqual(results[0], self.world_size)
for r in range(1, self.world_size):
self.assertIsNone(results[r])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the CUDA backend tests, there is code duplication in test_gather, test_scatter, and test_reduce. To improve maintainability, consider refactoring this by introducing a helper method to spawn processes and check results for these asymmetric collectives.

For example, you could introduce a helper like _spawn_and_check_asymmetric:

def _spawn_and_check_asymmetric(self, collective, expected_fn):
    mp_manager = mp.Manager()
    results = mp_manager.dict()
    mp.spawn(
        worker,
        args=(self.world_size, results, collective),
        nprocs=self.world_size,
        join=True,
    )
    expected_results = expected_fn(self.world_size)
    self.assertEqual(results.copy(), expected_results)

def test_gather(self):
    def expected_fn(size):
        res = {0: list(range(size))}
        for r in range(1, size):
            res[r] = None
        return res
    self._spawn_and_check_asymmetric("gather", expected_fn)

@codecov-commenter
Copy link
Copy Markdown

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

Copy link
Copy Markdown
Collaborator

@UNIDY2002 UNIDY2002 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly well done, except for you have to update the values of the param broadcastRoot.

After that, we are ready to merge!

Comment thread mooncake-pg/src/mooncake_backend.cpp Outdated
Comment on lines +609 to +623
c10d::OpType::REDUCE, tensorSize, 0, &meta_,
[=](void* dst, size_t pos, size_t realSize) {
memcpy(dst, (char*)tensor.data_ptr() + pos, realSize);
},
[=](void* src, size_t pos, size_t realSize) {
if (isRoot) {
memset((char*)tensor.data_ptr() + pos, 0, realSize);
launchReduceCpu(tensor, pos, realSize, src, numRanks,
opts.reduceOp, meta_.activeRanks);
}
});
} else {
auto stream = at::cuda::getCurrentCUDAStream(tensor.device().index());
return worker_.putTaskCuda(
c10d::OpType::REDUCE, tensorSize, 0, &meta_, stream,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hhr2449 Please set the broadcastRoot parameter of putTaskCuda to root rather than 0.

Comment thread mooncake-pg/src/mooncake_backend.cpp Outdated
if (isCpu_) {
auto numRanks = meta_.size;
return worker_.putTaskCpu(
c10d::OpType::REDUCE, tensorSize, 0, &meta_,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here for putTaskCpu's broadcastRoot.

Comment thread mooncake-pg/src/mooncake_backend.cpp Outdated
Comment on lines +653 to +670
c10d::OpType::GATHER, tensorSize, 0, &meta_,
[=](void* dst, size_t pos, size_t realSize) {
memcpy(dst, (char*)inputTensor.data_ptr() + pos, realSize);
},
[=](void* src, size_t pos, size_t realSize) {
if (isRoot) {
auto outputTensors_ = outputTensors.back();
for (const auto j : c10::irange(outputTensors_.size())) {
memcpy((char*)outputTensors_[j].data_ptr() + pos,
(char*)src + j * realSize, realSize);
}
}
});
} else {
auto stream =
at::cuda::getCurrentCUDAStream(inputTensor.device().index());
return worker_.putTaskCuda(
c10d::OpType::GATHER, tensorSize, 0, &meta_, stream,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Comment thread mooncake-pg/src/mooncake_backend.cpp Outdated
size_t tensorSize = inputTensor.numel() * inputTensor.element_size();
if (isCpu_) {
return worker_.putTaskCpu(
c10d::OpType::GATHER, tensorSize, 0, &meta_,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment thread mooncake-pg/src/mooncake_backend.cpp Outdated
size_t tensorSize = outputTensor.numel() * outputTensor.element_size();
if (isCpu_) {
return worker_.putTaskCpu(
c10d::OpType::SCATTER, tensorSize, 0, &meta_,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Comment thread mooncake-pg/src/mooncake_backend.cpp Outdated
Comment on lines +703 to +721
c10d::OpType::SCATTER, tensorSize, 0, &meta_,
[=](void* dst, size_t pos, size_t realSize) {
if (isRoot) {
auto inputTensors_ = inputTensors.back();
for (const auto j : c10::irange(inputTensors_.size())) {
memcpy((char*)dst + j * realSize,
(char*)inputTensors_[j].data_ptr() + pos,
realSize);
}
}
},
[=](void* src, size_t pos, size_t realSize) {
memcpy((char*)outputTensor.data_ptr() + pos, src, realSize);
});
} else {
auto stream =
at::cuda::getCurrentCUDAStream(outputTensor.device().index());
return worker_.putTaskCuda(
c10d::OpType::SCATTER, tensorSize, 0, &meta_, stream,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@UNIDY2002 UNIDY2002 merged commit 4f76f12 into kvcache-ai:main Feb 1, 2026
16 checks passed
@ShangmingCai
Copy link
Copy Markdown
Collaborator

ShangmingCai commented Feb 1, 2026

This PR name is really long and bad, we should rename it to a simple one before merging next time.

JasonZhang517 pushed a commit to JasonZhang517/Mooncake that referenced this pull request Feb 9, 2026
…ther`**, **`scatter`**, and **`reduce`** in the Mooncake backend (kvcache-ai#1469)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants