[core] Add single-controller API for ray.util.collective and torch gloo backend#53319
Conversation
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
Signed-off-by: Stephanie wang <smwang@cs.washington.edu>
| """ | ||
| Return all actor handles in this communicator. | ||
| """ | ||
| return self._actors[:] |
There was a problem hiding this comment.
is there any reason to do shallow copy?
There was a problem hiding this comment.
Lets the caller modify the list.
There was a problem hiding this comment.
this can easily become a gotcha; prefer a more explicit pattern (caller gets the list, modifies it, calls a setter) or else make it clear in the docstring
| @staticmethod | ||
| def get() -> "RemoteCommunicatorManager": | ||
| global _remote_communicator_manager | ||
| with _remote_communicator_manager_lock: |
There was a problem hiding this comment.
Why do we need a lock here? If we want to avoid race conditions, should we also add a lock to add_remote_communicator and remove_remote_communicator?
There was a problem hiding this comment.
yes it seems so. unclear if this interface is intended to be thread safe or not though
also, if it's always meant to be a global singleton, defining the interface as a set of functions is more natural
There was a problem hiding this comment.
Prevents race condition between checking if _remote_communicator_manager is None and setting it. The other methods are already thread-safe through python builtin dictionary.
also, if it's always meant to be a global singleton, defining the interface as a set of functions is more natural
The user-facing APIs are a set of functions. This singleton class is an implementation detail.
| # Find all collective groups that the given actors are a subset | ||
| # of, with the matching backend if provided. | ||
| for collective in self._remote_communicators.values(): | ||
| if actors.issubset(set(collective.actors)): |
There was a problem hiding this comment.
avoid converting list to set for every loop iteration
There was a problem hiding this comment.
We're not expecting to have many collectives to iterate through right now and it can be optimized later if it becomes a scalability issue.
| popped = None | ||
| return popped | ||
|
|
||
| def get_collective_groups( |
There was a problem hiding this comment.
should we have an API:
def get_collective_group_by_name(self, name: str):
...There was a problem hiding this comment.
We can add this later if we need it.
| self._backend = Backend(backend) | ||
|
|
||
| def get_rank(self, actor: ray.actor.ActorHandle): | ||
| for i, a in enumerate(self._actors): |
There was a problem hiding this comment.
should we maintain a actor-to-index Dict[ActorHandle, int] dict?
There was a problem hiding this comment.
We can add it later.
| import ray | ||
|
|
||
|
|
||
| def find_free_port(): |
There was a problem hiding this comment.
This may cause issues when users are using a Kubernetes service mesh, which requires knowing the communication ports in advance. It’s probably fine for now—we can wait to update it until users report problems.
Most users don't use service mesh.
edoakes
left a comment
There was a problem hiding this comment.
The API/usage makes sense to me. Had many of the same questions as Kai-Hsun on the implementation.
python/ray/util/collective/collective_group/torch_gloo_collective_group.py
Outdated
Show resolved
Hide resolved
| dist.init_process_group( | ||
| backend="gloo", init_method="env://", world_size=world_size, rank=rank | ||
| ) |
There was a problem hiding this comment.
Will torch.distributed give a useful error message if a user tries to instantiate two groups in the same process? (I'm assuming this would be an error because the process group is a global singleton)
There was a problem hiding this comment.
It should, but we also prevent this from happening right now, as long as you use the top-level collective APIs.
| """ | ||
| Create a collective group on the given list of actors. If this function |
There was a problem hiding this comment.
| """ | |
| Create a collective group on the given list of actors. If this function | |
| """Create a collective group on the given list of actors. | |
| If this function |
to follow google style guide. I think we have a linter that's supposed to tell you to do this now 🤔
| @staticmethod | ||
| def get() -> "RemoteCommunicatorManager": | ||
| global _remote_communicator_manager | ||
| with _remote_communicator_manager_lock: |
There was a problem hiding this comment.
yes it seems so. unclear if this interface is intended to be thread safe or not though
also, if it's always meant to be a global singleton, defining the interface as a set of functions is more natural
Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu>
Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu>
Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu>
|
Thanks for the review, this is ready for another round. Not sure why my linter's not working... |
| """ | ||
| Return all actor handles in this communicator. | ||
| """ | ||
| return self._actors[:] |
There was a problem hiding this comment.
this can easily become a gotcha; prefer a more explicit pattern (caller gets the list, modifies it, calls a setter) or else make it clear in the docstring
…oo backend (ray-project#53319) Adds single-controller APIs (APIs that can be called from the driver) for creating collectives on a group of actors using `ray.util.collective`. These APIs are currently under `ray.experimental.collective` as they are experimental and to avoid potential conflicts with `ray.util.collective`. See test_experimental_collective::test_api_basic for API usage. - create_collective_group - destroy_collective_group - get_collective_groups Also adds a ray.util.collective backend based on torch.distributed gloo, for convenient testing on CPUs. While ray.util.collective has a pygloo backend, this backend requires pygloo to be installed, and pygloo doesn't seem to be supported on latest versions of Python. --------- Signed-off-by: Stephanie wang <smwang@cs.washington.edu> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu> Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…GPU objects (#53720) Adds integration between the single-controller collective APIs introduced in #53319 and the GPU objects feature prototyped in #52938. Actor collectives created through `ray.experimental.collective.create_collective_group` will now be automatically used if a task declares a tensor transport other than the default OBJECT_STORE. This also adds support for allocating the torch tensors on the correct device (GPU for NCCL and CPU for GLOO). See updates in test_gpu_objects.py for examples. --------- Signed-off-by: Stephanie wang <smwang@cs.washington.edu> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu> Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…oo backend (#53319) Adds single-controller APIs (APIs that can be called from the driver) for creating collectives on a group of actors using `ray.util.collective`. These APIs are currently under `ray.experimental.collective` as they are experimental and to avoid potential conflicts with `ray.util.collective`. See test_experimental_collective::test_api_basic for API usage. - create_collective_group - destroy_collective_group - get_collective_groups Also adds a ray.util.collective backend based on torch.distributed gloo, for convenient testing on CPUs. While ray.util.collective has a pygloo backend, this backend requires pygloo to be installed, and pygloo doesn't seem to be supported on latest versions of Python. --------- Signed-off-by: Stephanie wang <smwang@cs.washington.edu> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu> Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…GPU objects (#53720) Adds integration between the single-controller collective APIs introduced in #53319 and the GPU objects feature prototyped in #52938. Actor collectives created through `ray.experimental.collective.create_collective_group` will now be automatically used if a task declares a tensor transport other than the default OBJECT_STORE. This also adds support for allocating the torch tensors on the correct device (GPU for NCCL and CPU for GLOO). See updates in test_gpu_objects.py for examples. --------- Signed-off-by: Stephanie wang <smwang@cs.washington.edu> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu> Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…GPU objects (ray-project#53720) Adds integration between the single-controller collective APIs introduced in ray-project#53319 and the GPU objects feature prototyped in ray-project#52938. Actor collectives created through `ray.experimental.collective.create_collective_group` will now be automatically used if a task declares a tensor transport other than the default OBJECT_STORE. This also adds support for allocating the torch tensors on the correct device (GPU for NCCL and CPU for GLOO). See updates in test_gpu_objects.py for examples. --------- Signed-off-by: Stephanie wang <smwang@cs.washington.edu> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu> Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…oo backend (#53319) Adds single-controller APIs (APIs that can be called from the driver) for creating collectives on a group of actors using `ray.util.collective`. These APIs are currently under `ray.experimental.collective` as they are experimental and to avoid potential conflicts with `ray.util.collective`. See test_experimental_collective::test_api_basic for API usage. - create_collective_group - destroy_collective_group - get_collective_groups Also adds a ray.util.collective backend based on torch.distributed gloo, for convenient testing on CPUs. While ray.util.collective has a pygloo backend, this backend requires pygloo to be installed, and pygloo doesn't seem to be supported on latest versions of Python. --------- Signed-off-by: Stephanie wang <smwang@cs.washington.edu> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu> Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…GPU objects (#53720) Adds integration between the single-controller collective APIs introduced in #53319 and the GPU objects feature prototyped in #52938. Actor collectives created through `ray.experimental.collective.create_collective_group` will now be automatically used if a task declares a tensor transport other than the default OBJECT_STORE. This also adds support for allocating the torch tensors on the correct device (GPU for NCCL and CPU for GLOO). See updates in test_gpu_objects.py for examples. --------- Signed-off-by: Stephanie wang <smwang@cs.washington.edu> Signed-off-by: Stephanie Wang <smwang@cs.washington.edu> Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Why are these changes needed?
Adds single-controller APIs (APIs that can be called from the driver) for creating collectives on a group of actors using
ray.util.collective. These APIs are currently underray.experimental.collectiveas they are experimental and to avoid potential conflicts withray.util.collective. See test_experimental_collective::test_api_basic for API usage.Also adds a ray.util.collective backend based on torch.distributed gloo, for convenient testing on CPUs. While ray.util.collective has a pygloo backend, this backend requires pygloo to be installed, and pygloo doesn't seem to be supported on latest versions of Python.
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.