[PG] Impelemented support for additional collective primitives: **gather**, **scatter**, and **reduce** in the Mooncake backend and Added unit tests in test_mooncake_backend.py and test_mooncake_backend_cpu to cover new ops#1469
Conversation
…tests in test_mooncake_backend.py to cover new ops
…tests in test_mooncake_backend.py to cover new ops
Summary of ChangesHello @hhr2449, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces support for the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for gather, scatter, and reduce collective operations in the Mooncake backend for both CPU and CUDA devices, including an optimization for memory copies and unit tests. However, critical logic errors were identified: the broadcastRoot is hardcoded to 0 in all three operations, ignoring the user-specified root rank, which can lead to incorrect data transfers and potential data leakage. Additionally, the reduce operation on the CPU backend utilizes a buggy reduction implementation. There's also an opportunity to improve the new tests by refactoring duplicated code to enhance maintainability. These issues should be addressed to ensure the correctness and security of the collective operations.
| c10d::OpType::REDUCE, tensorSize, 0, &meta_, | ||
| [=](void* dst, size_t pos, size_t realSize) { | ||
| memcpy(dst, (char*)tensor.data_ptr() + pos, realSize); | ||
| }, | ||
| [=](void* src, size_t pos, size_t realSize) { | ||
| if (isRoot) { | ||
| memset((char*)tensor.data_ptr() + pos, 0, realSize); | ||
| launchReduceCpu(tensor, pos, realSize, src, numRanks, | ||
| opts.reduceOp, meta_.activeRanks); | ||
| } | ||
| }); | ||
| } else { | ||
| auto stream = at::cuda::getCurrentCUDAStream(tensor.device().index()); | ||
| return worker_.putTaskCuda( | ||
| c10d::OpType::REDUCE, tensorSize, 0, &meta_, stream, |
There was a problem hiding this comment.
The reduce implementation hardcodes the broadcastRoot parameter to 0 in both CPU and CUDA task submissions (lines 609 and 623). This ignores the rootRank specified in the collective options, which will cause the operation to fail or produce incorrect results if the designated root is not rank 0. Additionally, the CPU implementation (line 616) calls launchReduceCpu, which relies on a buggy reduceCpu implementation that incorrectly initializes the accumulator and handles offsets for active ranks. The broadcastRoot should be set to the calculated root variable.
There was a problem hiding this comment.
@hhr2449 Please set the broadcastRoot parameter of putTaskCuda to root rather than 0.
| c10d::OpType::GATHER, tensorSize, 0, &meta_, | ||
| [=](void* dst, size_t pos, size_t realSize) { | ||
| memcpy(dst, (char*)inputTensor.data_ptr() + pos, realSize); | ||
| }, | ||
| [=](void* src, size_t pos, size_t realSize) { | ||
| if (isRoot) { | ||
| auto outputTensors_ = outputTensors.back(); | ||
| for (const auto j : c10::irange(outputTensors_.size())) { | ||
| memcpy((char*)outputTensors_[j].data_ptr() + pos, | ||
| (char*)src + j * realSize, realSize); | ||
| } | ||
| } | ||
| }); | ||
| } else { | ||
| auto stream = | ||
| at::cuda::getCurrentCUDAStream(inputTensor.device().index()); | ||
| return worker_.putTaskCuda( | ||
| c10d::OpType::GATHER, tensorSize, 0, &meta_, stream, |
There was a problem hiding this comment.
The gather implementation hardcodes the broadcastRoot parameter to 0 in both CPU and CUDA task submissions (lines 653 and 670). This ignores the rootRank specified in the collective options, causing all ranks to send data to rank 0 regardless of the intended destination. This leads to data corruption and potential leakage of stale data from rank 0's buffers. The broadcastRoot should be set to the root variable.
| c10d::OpType::SCATTER, tensorSize, 0, &meta_, | ||
| [=](void* dst, size_t pos, size_t realSize) { | ||
| if (isRoot) { | ||
| auto inputTensors_ = inputTensors.back(); | ||
| for (const auto j : c10::irange(inputTensors_.size())) { | ||
| memcpy((char*)dst + j * realSize, | ||
| (char*)inputTensors_[j].data_ptr() + pos, | ||
| realSize); | ||
| } | ||
| } | ||
| }, | ||
| [=](void* src, size_t pos, size_t realSize) { | ||
| memcpy((char*)outputTensor.data_ptr() + pos, src, realSize); | ||
| }); | ||
| } else { | ||
| auto stream = | ||
| at::cuda::getCurrentCUDAStream(outputTensor.device().index()); | ||
| return worker_.putTaskCuda( | ||
| c10d::OpType::SCATTER, tensorSize, 0, &meta_, stream, |
There was a problem hiding this comment.
The scatter implementation hardcodes the broadcastRoot parameter to 0 in both CPU and CUDA task submissions (lines 703 and 721). This ignores the rootRank specified in the collective options. As a result, only rank 0 will attempt to perform the scatter transfers, potentially sending its own data instead of the actual root's data. This leads to data corruption and potential leakage of sensitive information from rank 0's buffers. The broadcastRoot should be set to the root variable.
| def test_gather(self): | ||
| # Expected gather (Root) = [0, 1, 2, ..., size-1] | ||
| # Expected gather (Others) = None | ||
| mp_manager = mp.Manager() | ||
| results = mp_manager.dict() | ||
| mp.spawn( | ||
| worker, | ||
| args=(self.world_size, results, "gather"), | ||
| nprocs=self.world_size, | ||
| join=True, | ||
| ) | ||
| self.assertEqual(results[0], list(range(self.world_size))) | ||
| for r in range(1, self.world_size): | ||
| self.assertIsNone(results[r]) | ||
|
|
||
| def test_scatter(self): | ||
| # Expected scatter (Rank i) = i | ||
| mp_manager = mp.Manager() | ||
| results = mp_manager.dict() | ||
| mp.spawn( | ||
| worker, | ||
| args=(self.world_size, results, "scatter"), | ||
| nprocs=self.world_size, | ||
| join=True, | ||
| ) | ||
| for r in range(self.world_size): | ||
| self.assertEqual(results[r], r) | ||
|
|
||
| def test_reduce(self): | ||
| # Expected reduce (Root) = sum([1, 1, ..., 1]) = size | ||
| # Expected reduce (Others) = None | ||
| mp_manager = mp.Manager() | ||
| results = mp_manager.dict() | ||
| mp.spawn( | ||
| worker, | ||
| args=(self.world_size, results, "reduce"), | ||
| nprocs=self.world_size, | ||
| join=True, | ||
| ) | ||
| self.assertEqual(results[0], self.world_size) | ||
| for r in range(1, self.world_size): | ||
| self.assertIsNone(results[r]) |
There was a problem hiding this comment.
There is significant code duplication across test_gather, test_scatter, and test_reduce. To improve maintainability, consider creating a new helper method for these asymmetric collectives, similar to how _spawn_and_check is used for symmetric ones.
For example, you could introduce a helper like _spawn_and_check_asymmetric:
def _spawn_and_check_asymmetric(self, collective, expected_fn):
mp_manager = mp.Manager()
results = mp_manager.dict()
mp.spawn(
worker,
args=(self.world_size, results, collective),
nprocs=self.world_size,
join=True,
)
expected_results = expected_fn(self.world_size)
self.assertEqual(results.copy(), expected_results)
def test_gather(self):
def expected_fn(size):
res = {0: list(range(size))}
for r in range(1, size):
res[r] = None
return res
self._spawn_and_check_asymmetric("gather", expected_fn)| def test_gather(self): | ||
| # Expected gather (Root) = [0, 1, 2, ..., size-1] | ||
| # Expected gather (Others) = None | ||
| mp_manager = mp.Manager() | ||
| results = mp_manager.dict() | ||
| mp.spawn( | ||
| worker, | ||
| args=(self.world_size, results, "gather"), | ||
| nprocs=self.world_size, | ||
| join=True | ||
| ) | ||
| self.assertEqual(results[0], list(range(self.world_size))) | ||
| for r in range(1, self.world_size): | ||
| self.assertIsNone(results[r]) | ||
|
|
||
| def test_scatter(self): | ||
| # Expected scatter (Rank i) = i | ||
| mp_manager = mp.Manager() | ||
| results = mp_manager.dict() | ||
| mp.spawn( | ||
| worker, | ||
| args=(self.world_size, results, "scatter"), | ||
| nprocs=self.world_size, | ||
| join=True | ||
| ) | ||
| for r in range(self.world_size): | ||
| self.assertEqual(results[r], r) | ||
|
|
||
| def test_reduce(self): | ||
| # Expected reduce (Root) = sum([1, 1, ..., 1]) = size | ||
| # Expected reduce (Others) = None | ||
| mp_manager = mp.Manager() | ||
| results = mp_manager.dict() | ||
| mp.spawn( | ||
| worker, | ||
| args=(self.world_size, results, "reduce"), | ||
| nprocs=self.world_size, | ||
| join=True | ||
| ) | ||
| self.assertEqual(results[0], self.world_size) | ||
| for r in range(1, self.world_size): | ||
| self.assertIsNone(results[r]) |
There was a problem hiding this comment.
Similar to the CUDA backend tests, there is code duplication in test_gather, test_scatter, and test_reduce. To improve maintainability, consider refactoring this by introducing a helper method to spawn processes and check results for these asymmetric collectives.
For example, you could introduce a helper like _spawn_and_check_asymmetric:
def _spawn_and_check_asymmetric(self, collective, expected_fn):
mp_manager = mp.Manager()
results = mp_manager.dict()
mp.spawn(
worker,
args=(self.world_size, results, collective),
nprocs=self.world_size,
join=True,
)
expected_results = expected_fn(self.world_size)
self.assertEqual(results.copy(), expected_results)
def test_gather(self):
def expected_fn(size):
res = {0: list(range(size))}
for r in range(1, size):
res[r] = None
return res
self._spawn_and_check_asymmetric("gather", expected_fn)|
Codecov Report✅ All modified and coverable lines are covered by tests. 📢 Thoughts on this report? Let us know! |
UNIDY2002
left a comment
There was a problem hiding this comment.
Mostly well done, except for you have to update the values of the param broadcastRoot.
After that, we are ready to merge!
| c10d::OpType::REDUCE, tensorSize, 0, &meta_, | ||
| [=](void* dst, size_t pos, size_t realSize) { | ||
| memcpy(dst, (char*)tensor.data_ptr() + pos, realSize); | ||
| }, | ||
| [=](void* src, size_t pos, size_t realSize) { | ||
| if (isRoot) { | ||
| memset((char*)tensor.data_ptr() + pos, 0, realSize); | ||
| launchReduceCpu(tensor, pos, realSize, src, numRanks, | ||
| opts.reduceOp, meta_.activeRanks); | ||
| } | ||
| }); | ||
| } else { | ||
| auto stream = at::cuda::getCurrentCUDAStream(tensor.device().index()); | ||
| return worker_.putTaskCuda( | ||
| c10d::OpType::REDUCE, tensorSize, 0, &meta_, stream, |
There was a problem hiding this comment.
@hhr2449 Please set the broadcastRoot parameter of putTaskCuda to root rather than 0.
| if (isCpu_) { | ||
| auto numRanks = meta_.size; | ||
| return worker_.putTaskCpu( | ||
| c10d::OpType::REDUCE, tensorSize, 0, &meta_, |
There was a problem hiding this comment.
Same here for putTaskCpu's broadcastRoot.
| c10d::OpType::GATHER, tensorSize, 0, &meta_, | ||
| [=](void* dst, size_t pos, size_t realSize) { | ||
| memcpy(dst, (char*)inputTensor.data_ptr() + pos, realSize); | ||
| }, | ||
| [=](void* src, size_t pos, size_t realSize) { | ||
| if (isRoot) { | ||
| auto outputTensors_ = outputTensors.back(); | ||
| for (const auto j : c10::irange(outputTensors_.size())) { | ||
| memcpy((char*)outputTensors_[j].data_ptr() + pos, | ||
| (char*)src + j * realSize, realSize); | ||
| } | ||
| } | ||
| }); | ||
| } else { | ||
| auto stream = | ||
| at::cuda::getCurrentCUDAStream(inputTensor.device().index()); | ||
| return worker_.putTaskCuda( | ||
| c10d::OpType::GATHER, tensorSize, 0, &meta_, stream, |
| size_t tensorSize = inputTensor.numel() * inputTensor.element_size(); | ||
| if (isCpu_) { | ||
| return worker_.putTaskCpu( | ||
| c10d::OpType::GATHER, tensorSize, 0, &meta_, |
| size_t tensorSize = outputTensor.numel() * outputTensor.element_size(); | ||
| if (isCpu_) { | ||
| return worker_.putTaskCpu( | ||
| c10d::OpType::SCATTER, tensorSize, 0, &meta_, |
| c10d::OpType::SCATTER, tensorSize, 0, &meta_, | ||
| [=](void* dst, size_t pos, size_t realSize) { | ||
| if (isRoot) { | ||
| auto inputTensors_ = inputTensors.back(); | ||
| for (const auto j : c10::irange(inputTensors_.size())) { | ||
| memcpy((char*)dst + j * realSize, | ||
| (char*)inputTensors_[j].data_ptr() + pos, | ||
| realSize); | ||
| } | ||
| } | ||
| }, | ||
| [=](void* src, size_t pos, size_t realSize) { | ||
| memcpy((char*)outputTensor.data_ptr() + pos, src, realSize); | ||
| }); | ||
| } else { | ||
| auto stream = | ||
| at::cuda::getCurrentCUDAStream(outputTensor.device().index()); | ||
| return worker_.putTaskCuda( | ||
| c10d::OpType::SCATTER, tensorSize, 0, &meta_, stream, |
|
This PR name is really long and bad, we should rename it to a simple one before merging next time. |
…ther`**, **`scatter`**, and **`reduce`** in the Mooncake backend (kvcache-ai#1469)
Description
Part of #1225
This PR implements support for additional collective primitives:
gather,scatter, andreducein the Mooncake backendKey Changes:
Support for New Collective Primitives:
gather,scatter, andreduceAPIs inMooncakeBackend.MooncakeWorkerfor both CPU and CUDA backends.cudaMemcpyHostToDevice/DeviceToHostwithcudaMemcpyDeviceToDevicefor internal GPU buffer operations.Unit Tests:
tests/test_mooncake_backend.py(CUDA) andtests/test_mooncake_backend_cpu.py(CPU) to include test cases forgather,scatter, andreduce.Type of Change
How Has This Been Tested?
I ran the updated unit tests in a GPU environment to verify the correctness of the new primitives, specifically focusing on asymmetric behaviors.
python3 -m unittest mooncake-wheel.tests.test_mooncake_backendtest_gather,test_scatter, andtest_reduce. Verified that Root correctly aggregates[0, 1, ...]for gather and sums values for reduce.Checklist
./scripts/code_format.shbefore submitting.