[core][compiled graphs] Add CPU-based NCCL communicator for development#48440
[core][compiled graphs] Add CPU-based NCCL communicator for development#48440stephanie-wang merged 49 commits intoray-project:masterfrom tfsingh:py-ts/cpu-nccl
Conversation
…er of allreduce ops; minor change: sesddefaultdict on line63 for simplicity"; added line 94 for unsupported collective ops
|
Could you also add a test for this? |
…e op of CPUNcclGroup
…ce self.get_communicator in CPUNcclGroup
|
Just a heads up that @anyadontfly and I are writing some e2e tests with the compiled dag API, and should have them done by tomorrow. |
…rements in 2e2 run; reverted changes in torch_tensor_type.py; added 1 e2e test for p2p and 7 2e2 tests for allreduce
stephanie-wang
left a comment
There was a problem hiding this comment.
Nice, this looks pretty good! Left some comments for cleanup/clarification but overall the structure looks good.
| self.collective_data: Dict[int, List[torch.Tensor]] = defaultdict(list) | ||
| # Buffer for the number of actors seen, each entry is one p2p op. | ||
| self.num_actors_seen = defaultdict(int) | ||
| # Number of actors who have read the result, and are about the exit the function. |
There was a problem hiding this comment.
| # Number of actors who have read the result, and are about the exit the function. | |
| # Number of actors who have read the result, and are about to exit the function. |
| self.communicators.add(comm) | ||
|
|
||
| received_tensor = ray.get(comm.wait_p2p.remote(self.num_ops[comm_key])) | ||
| assert ( |
There was a problem hiding this comment.
In this case you can probably just directly return the received_tensor (allocator is needed for cases where a receive buffer needs to be allocated before the recv happens).
| assert ( | ||
| ray.get_gpu_ids() | ||
| ), "Actors participating in NCCL group must have at least one GPU assigned" | ||
| if not custom_nccl_group or not isinstance(custom_nccl_group, CPUNcclGroup): |
There was a problem hiding this comment.
| if not custom_nccl_group or not isinstance(custom_nccl_group, CPUNcclGroup): | |
| if not (custom_nccl_group and isinstance(custom_nccl_group, CPUNcclGroup)): |
nit, a bit more readable with a single not.
| return result | ||
|
|
||
|
|
||
| class CPUCommunicator: |
There was a problem hiding this comment.
Can we make this inherit from GPUCommunicator?
There was a problem hiding this comment.
One suggestion is to rename the GPUCommunicator into a generic Communicator or DeviceCommunicator, and you can have it return a string of the expected resource type that actors in the group should have.
| ) | ||
|
|
||
|
|
||
| def _do_init_cpu_group( |
There was a problem hiding this comment.
Can you see if we can reuse the existing _do_init_nccl_group instead?
python/ray/dag/collective_node.py
Outdated
| recv_buf = torch.empty_like(send_buf) | ||
| nccl_group.allreduce(send_buf, recv_buf, self._op) | ||
| ctx = ChannelContext.get_current() | ||
| if ctx.nccl_groups: |
There was a problem hiding this comment.
Same here, I think you can restructure this to just take the "default group", whether it's NCCL or CPU, and then the code inside the if-else branches is the same for both.
| self.nccl_groups: Dict[str, "GPUCommunicator"] = {} | ||
| # Used for the torch.Tensor CPU transport. | ||
| self.cpu_groups: Dict[str, "CPUCommunicator"] = {} |
There was a problem hiding this comment.
// group ID -> Communicator
Option 1: self.device_groups: Dict[str, Communicator]
// resource label -> group ID -> Communicator
Option2: self.device_groups: Dict[str, str, Communicator]
There was a problem hiding this comment.
can we keep the name self.nccl_groups?
…icator recv_stream and send_ctream raise NotImplementedError
stephanie-wang
left a comment
There was a problem hiding this comment.
Thanks, this looks great! A couple minor comments about naming then we can merge it. Let's have get_device_type return "gpu" instead of "nccl".
| @abstractmethod | ||
| def get_device_type() -> str: | ||
| """ | ||
| Return the type of the communicator (nccl or cpu). |
There was a problem hiding this comment.
| Return the type of the communicator (nccl or cpu). | |
| Return the type of the communicator (gpu or cpu). |
| self._comm.destroy() | ||
|
|
||
| def get_device_type(self) -> str: | ||
| return "nccl" |
There was a problem hiding this comment.
| return "nccl" | |
| return "gpu" |
"nccl" is the transport name but "gpu" is the device that each actor is expected to have.
You could add a get_transport_name and that can return NCCL instead.
python/ray/dag/compiled_dag_node.py
Outdated
| # This is set to the specified custom nccl group | ||
| # if there exists a type hint of `transport=nccl_group`. | ||
| self._custom_nccl_group_p2p: Optional[GPUCommunicator] = None | ||
| self._custom_nccl_group_p2p: Optional[Communicator] = None |
There was a problem hiding this comment.
For consistency, it would be good to replace-all nccl_group with communicator.
There was a problem hiding this comment.
Got it. Also we have functions for initializing and destroying CPUCommunicator in torch_tensor_nccl_channel.py. Do we have to change the file name of torch_tensor_nccl_channel.py or to put those functions in a separate file?
Signed-off-by: tfsingh <105320310+tfsingh@users.noreply.github.com>
Signed-off-by: tfsingh <105320310+tfsingh@users.noreply.github.com>
Why are these changes needed?
This allows developers to debug DAGs with collective ops on CPU. Currently we use Ray actor to perform allreduce.
Related issue number
Closes #47936
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.