[Compiled Graph] Enhance Compile Graph with Multi-Device Support#51032
[Compiled Graph] Enhance Compile Graph with Multi-Device Support#51032stephanie-wang merged 1 commit intoray-project:masterfrom
Conversation
f63223c to
27215f3
Compare
|
@ruisearch42 Good Day. Could you please review this PR? Thanks. |
There was a problem hiding this comment.
If the method returns None, is it still necessary to mark it as an abstract method?
There was a problem hiding this comment.
Keep this class abstrct.
a4d06c9 to
705a10d
Compare
|
Hi @hipudding , is this ready for review? btw, aDAG was renamed to Compiled Graph: https://docs.ray.io/en/latest/ray-core/compiled-graph/ray-compiled-graph.html |
Yes, I think the main functionality of this PR is ready, I may make some minor adjustments later. Thanks for your review. |
Thanks. I will review tomorrow! |
ruisearch42
left a comment
There was a problem hiding this comment.
Trying to better understand the PR
| with self._send_stream: | ||
| return self._write() |
There was a problem hiding this comment.
Why don't we want to use context manager?
There was a problem hiding this comment.
This stream context manager is provided by cupy, it only support CUDA devices, So I just remote context manager and specified stream in read and write.
But I think I can implement a stream context manager in Ray, so cupy is not needed. Which is better?
There was a problem hiding this comment.
Done. use torch StreamContext instead to avoid too many changes.
python/ray/dag/compiled_dag_node.py
Outdated
| self._intermediate_future = future | ||
|
|
||
| def reset_and_wait_intermediate_future(self) -> Any: | ||
| def reset_and_wait_intermediate_future(self, stream: Any = None) -> Any: |
There was a problem hiding this comment.
Done. reverted this part.
python/ray/dag/compiled_dag_node.py
Outdated
| self, | ||
| val: Any, | ||
| wrap_in_gpu_future: bool, | ||
| stream: Any = None, |
There was a problem hiding this comment.
Done. reverted this part.
| class _DriverGroupHolder(Communicator): | ||
| """ | ||
| Communicator place holder for Driver, Since driver may has no | ||
| Accelerators, TorchDeviceManager cannot be used. | ||
| """ |
There was a problem hiding this comment.
Can you elaborate a bit why this is needed?
There was a problem hiding this comment.
I replace all torch.cuda.xxx by TorchDeviceManager, including create a _NcclGroup. But driver will keep a communicator instance itself. Driver will not join the group, but need record some information like world size and actors.
But Driver may has no GPU ( or other accelerators), it will choose the wrong TorchDeviceManager (CPUTorchDeviceManager), and get_communicator will return None.
Since Driver will not join the group, it use Communicator only for store some information. So I create a Hardware-independent class for driver.
| comm_id: int, | ||
| rank: Optional[int], | ||
| actor_handles: List["ray.actor.ActorHandle"], | ||
| acl_stream: Optional[Any], |
There was a problem hiding this comment.
What is acl_tream? doc for args
There was a problem hiding this comment.
acl_stream is the raw stream in Ascend NPU. Just like cuda_stream to GPU.
What about name this stream torch_stream? since acl_stream and cuda_stream are both torch_stream now.
There was a problem hiding this comment.
Done. I changed it to npu_stream for better understanding.
| def get_world_size(self) -> int: | ||
| return self._world_size | ||
|
|
||
| def send(self, buf: "torch.Tensor", peer_rank: int) -> None: |
There was a problem hiding this comment.
many methods in this class look like code duplication from _NcclGroup with minor changes. Should they be unified?
There was a problem hiding this comment.
Yes, we should indeed reduce duplicate code. I will think about how to design it.
There was a problem hiding this comment.
Indeed, these two files share logical similarities and contain a lot of duplicate code. However, there are several differences in the details. For example, torch.cuda.xxx needs to be replaced with torch.npu.xxx, hccl_group requires an additional StreamContext, and the set device operation must be performed before get_unique_id and creating the HCCL group. Therefore, I haven’t found a good way to merge these two files. Do you have any suggestions?
1e80d9f to
d682702
Compare
|
Hi @ruisearch42, Could you please review this PR again? BTW, I fixed test case fail in doc_test_cgraph_nccl. It blocked this PR. |
61d9600 to
d6d4b76
Compare
There was a problem hiding this comment.
Got it. Big enough.
|
@ruisearch42 @stephanie-wang This PR is ready for review now. Could you please review this PR again? Thanks. |
Thanks for the efforts! will do! |
stephanie-wang
left a comment
There was a problem hiding this comment.
Thanks for the contribution! I like the new APIs. Let's check if we can reduce the driver API; eventually it would be good to use this as a single-controller API for Ray GPU objects too.
There was a problem hiding this comment.
What is "it" in this comment?
There was a problem hiding this comment.
Rewrite the class comment.
There was a problem hiding this comment.
Type hints and comments.
There was a problem hiding this comment.
Why use _set_context instead of just passing these args to cls() constructor?
There was a problem hiding this comment.
There are two steps here:
1. Set class attributes, including _torch_module_name and _communicator_cls. These attributes can be configured via register_accelerator_context; if not set, the system will automatically detect whether to use GPU or CPU.
2. Instantiate the AcceleratorContext object based on these class attributes.
If we were to use the constructor here, we would need to introduce global variables to store _torch_module_name and _communicator_cls.
Which approach do you think is better?
There was a problem hiding this comment.
Hmm the current code is brittle because if you call AcceleratorContext._set_context, it will modify a previously created singleton returned by AcceleratorContext.get() as well. It would be better if you can make it so that calling AcceleratorContext.get() returns an instance whose attributes are not tied to the class attributes. This make it a bit nicer for testing or if we eventually want to support multiple contexts in the same process.
I don't have a strong preference on how you should do that, although I agree it would be nice to not need global variables for the torch_module_name and communicator_cls. You could follow the DataContext pattern, which stores the defaults on the class but uses dataclass and a global default context for the current instance.
There was a problem hiding this comment.
| def get_communicator(self, *args, **kwargs): | |
| def create_communicator(self, *args, **kwargs): |
There was a problem hiding this comment.
| Retrieves the communication group for collective operations. | |
| Creates a communication group for collective operations. |
There was a problem hiding this comment.
Rename to do_register_accelerator_context?
There was a problem hiding this comment.
A bit confusing that this is calling _do_check_has_communicator but the variable is called has_accelerators. Can't the actors have accelerators but not a communicator created yet?
There was a problem hiding this comment.
Also I think this check is not exactly equivalent to the previous check - is it right that each actor could now have an AcceleratorContext but they could be for different devices? Do we have a unit test for this?
There was a problem hiding this comment.
You're right. I will fix this.
There was a problem hiding this comment.
This has been changed to check whether all actors use the same accelerator, to ensure consistency across all actors. A test case added.
There was a problem hiding this comment.
Just checking - why was it that this code could be removed?
There was a problem hiding this comment.
This function returns the logical IDs of the CUDA devices visible to the current worker. Currently, in CG, the device at index 0 is used as the default device, and the logical ID of the device at index 0 is also 0, so I believe calling get_cuda_devices is unnecessary. I think this part needs to be double-checked by @ruisearch42.
There was a problem hiding this comment.
| # set custom communicator on all actors | |
| # Set custom communicator on all actors. |
There was a problem hiding this comment.
Is there a way we can take in the _communicator_cls and _torch_module_name as arguments to this function? That makes it explicit what the dependencies of this function are, instead of passing the dependencies through the accelerator context.
There was a problem hiding this comment.
Is there a way we can take in the
_communicator_clsand_torch_module_nameas arguments to this function? That makes it explicit what the dependencies of this function are, instead of passing the dependencies through the accelerator context.
Are you suggesting that _communicator_cls and _torch_module_name should be passed as parameters to the _init_communicator function?
Sorry, I didn’t quite understand what you meant. Could you please explain this sentence again? |
There was a problem hiding this comment.
| # Not registrey yet. | |
| # Not registered yet. |
There was a problem hiding this comment.
This part of code has been rewirte.
Ah this was just the comment about removing NotImplemented methods for the CommunicatorHandle class. Eventually we would like to use this API to program collective operations in general Ray programs, not just in compiled graphs (see #51173) |
|
I think currently looks great, while currently, the assumption is that all the device that support this should support torch stream, right? Is that possible that we list the APIs that to support all these features at least? |
Yes, because currently CG only supports out-of-band transmission of torch.tensor, the accelerator must support the PyTorch interface—not just the torch stream, but also the interfaces related to device and event. I can add the backend capability requirements in the comments of accelerator_context.py. |
stephanie-wang
left a comment
There was a problem hiding this comment.
Thanks, this looks good! Please address the last few comments and then we can merge it.
| class AcceleratorContext: | ||
| """ | ||
| Provides a unified interface for managing different accelerator backends | ||
| This eincludes stram management, event creation, device context control, |
There was a problem hiding this comment.
| This eincludes stram management, event creation, device context control, | |
| This includes stream management, event creation, device context control, |
| and communicator support for distributed communication. | ||
| """ | ||
|
|
||
| def __init__(self, torch_module_name: str, commumcator_cls: Type[Communicator]): |
There was a problem hiding this comment.
| def __init__(self, torch_module_name: str, commumcator_cls: Type[Communicator]): | |
| def __init__(self, torch_module_name: str, communicator_cls: Type[Communicator]): |
|
|
||
| Args: | ||
| torch_module_name: Name of the torch device module (e.g., "cuda", "cpu"). | ||
| commumcator_cls: Class used to handle communication. |
There was a problem hiding this comment.
| commumcator_cls: Class used to handle communication. | |
| communicator_cls: Class used to handle communication. |
| @staticmethod | ||
| def set(accelerator_context: "AcceleratorContext") -> None: | ||
| """ | ||
| Sets the accelerator context to the default context. |
There was a problem hiding this comment.
| Sets the accelerator context to the default context. | |
| Overwrites the default accelerator context. |
| """ | ||
| return self._communicator_cls(*args, **kwargs) | ||
|
|
||
| def get_module_name(self) -> str: |
There was a problem hiding this comment.
Could you change these getter methods to use @property?
| torch_module_name: The name of the device module under torch. | ||
| communicator: The communicator class associated with the device. | ||
| """ | ||
| accleerator_context = AcceleratorContext(torch_module_name, communicator_cls) |
There was a problem hiding this comment.
| accleerator_context = AcceleratorContext(torch_module_name, communicator_cls) | |
| accelerator_context = AcceleratorContext(torch_module_name, communicator_cls) |
| if self._is_registered: | ||
| return self._torch_module_name | ||
| return None |
There was a problem hiding this comment.
I found this a bit confusing: I think it is odd that you can call register multiple times and end up with multiple AcceleratorContexts that have _is_registered=True.
Instead, how about removing the _is_registered flag and just having get_module_name. You can create a _global_custom_context along with the _default_accelerator_context, and AcceleratorContext.get() will return the custom context if set else it returns the default context. This makes it more explicit which context is registered or not and whether the context is custom or not.
There was a problem hiding this comment.
I fully agree with your suggestion. I have implemented _global_custom_context following your design and provided a method is_accelerator_context_registered to check whether a context is registered. The code now looks much clearer. Thank you again for your valuable feedback!
|
|
||
| class CommunicatorHandle: | ||
| """ | ||
| A lightweight communicator handle used to store actors in the communicator. |
There was a problem hiding this comment.
| A lightweight communicator handle used to store actors in the communicator. | |
| A lightweight communicator handle used by the driver to store handles to the actors in the communicator. |
Sorry for all the typos—I should have enabled local spell checking. Thank you for going through so many rounds of review and providing suggestions for improvement. I’ll fix these issues as soon as possible. |
|
Looks like there are just some lint errors to fix. |
This commit introduces multi-device support in Compile Graph,
extending beyond CUDA's NCCL to accommodate various accelerators.
The key changes include:
1. Removed dependency on `cupy.cuda.ExternalStream`, Replaced with
`torch.{device}.StreamContext` to support diverse hardware.
2. Replaced hardcoded `torch.cuda.xxx` calls with `AcceleratorRuntime`
Enables automatic detection and invocation of device-specific
functions.
Co-authored-by: hipudding <huafengchun@gmail.com>
Signed-off-by: noemotiovon <757486878@qq.com>
|
@stephanie-wang @ruisearch42 Hi, Could you please merge this PR? Thanks. |
|
@hipudding Hello, this is a great piece of work that I can learn from. However, I have a few questions—would you mind helping me clarify them?
Thank you for your time. |
Yes, this part has removed and leave a register method to add your own backend. Currently, hccl backend can't add to Ray project because ray community has no Ascend servers and can't test it. I keep a backup of hccl and hccl_group, you can see 0482edc for an example.
Yes, you're right, Use this register and enable your backend. |
Why are these changes needed?
This PR improves multi-device support in Compile Graph, which significantly reduces Tensor transmission latency by utilizing out-of-band communication. Currently, this feature only supports CUDA’s NCCL. Since Ray already supports multiple accelerators, it is necessary to extend Compile Graph to support multi-device as well.
This PR mainly introduces two key changes:
1. Removed dependency on cupy.cuda.ExternalStream – Since this library only supports CUDA devices, we replaced it with a more general stream context manager to accommodate various accelerators. The new implementation uses torch.{device}.StreamContext.
2. Replaced hardcoded torch.cuda.xxx calls with AcceleratorRuntime – This allows automatic detection of the accelerator type and invokes the appropriate device-specific functions.
How to add a new backend for CG? here's an example for Ascend NPU:
Related issue number
This PR is the main part of Task 2 in #51574
It would better to set the function name more general, such as changing requires_nccl to require_communicator. This is implemented in #51061.
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.