[core][compiled graphs] Support experimental_compile(_default_communicator=comm)#50023
Merged
stephanie-wang merged 33 commits intoray-project:masterfrom Feb 5, 2025
Merged
Conversation
82bd905 to
3a568e4
Compare
kevin85421
approved these changes
Feb 3, 2025
Member
kevin85421
left a comment
There was a problem hiding this comment.
Leave some comments. Overall looks good to me. Because this is an API change, it'd be helpful to get a review from @stephanie-wang.
xsuler
pushed a commit
to antgroup/ant-ray
that referenced
this pull request
Mar 4, 2025
…cator=comm) (ray-project#50023) Currently, `with_tensor_transport(transport=arg)` supports 3 types of arg: "nccl", "auto", or a `Communicator`. If "nccl" is used, or "auto" is used and resolves to using NCCL, Ray creates a communicator internally and uses it. In order to better manage the communicator lifecycle, we make the following changes: - Add API to support `experimental_compile(_default_communicator=default_comm)`, default_comm is a default communicator user provided - `_default_communicator` in the above API can be "create", which explicitly tells Ray to create a default communicator and use it for all `with_tensor_transport()` sites where a specific communicator was not provided - If `_default_communicator` is not specified, Ray will not create a default communicator or reuse communicators passed in from other `with_tensor_transport()` sites, but throw an error when a communicator is needed. Note that this is backward incompatible. - For collectives, if a custom communicator is specified at a specific site, we use it; otherwise, if default is provided, we use the default; otherwise if default is "create", we create a communicator per collective op, but reuse the communicator op when actor set is the same. For all passed-in communicator, we check its actor set is the same as that of the communicator. - For "create", a single p2p communicator is created for all involved actors without a passed-in communicator. We reuse a previously created collective communicator if its actor set includes all p2p actors. - `_init_communicator()` is called for all communicators (passed in or created) on its involved actors, and `_destroy_communicator()` is only called for the ones created by Compiled Graph. By default, `experimental_compile(_default_communicator="create")` is used, therefore: - Any custom communicator passed in (via `with_tensor_transport()` for p2p, and `collective.allreduce.bind()` for collective) will take precedence and be used in the corresponding operations. These communicators won't be reused at other sites. (User should explicitly specify if they want.) - For each collective operation without a custom communicator, a new communicator is created, and this communicator is reused for all other collective-operation-without-custom-communicator that has the same set of actors. - For all p2p communications without custom communicator, a single communicator is created for all involved actors. Creation is skipped if there is a communicator created for collective operation, whose actors include all p2p actors: the collective communicator is reused in p2p communications in this case. This PR also refactors `CompiledDAG._preprocess()` so that code is more organized. Closes ray-project#47540 --------- Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
xsuler
pushed a commit
to antgroup/ant-ray
that referenced
this pull request
Mar 4, 2025
…cator=comm) (ray-project#50023) Currently, `with_tensor_transport(transport=arg)` supports 3 types of arg: "nccl", "auto", or a `Communicator`. If "nccl" is used, or "auto" is used and resolves to using NCCL, Ray creates a communicator internally and uses it. In order to better manage the communicator lifecycle, we make the following changes: - Add API to support `experimental_compile(_default_communicator=default_comm)`, default_comm is a default communicator user provided - `_default_communicator` in the above API can be "create", which explicitly tells Ray to create a default communicator and use it for all `with_tensor_transport()` sites where a specific communicator was not provided - If `_default_communicator` is not specified, Ray will not create a default communicator or reuse communicators passed in from other `with_tensor_transport()` sites, but throw an error when a communicator is needed. Note that this is backward incompatible. - For collectives, if a custom communicator is specified at a specific site, we use it; otherwise, if default is provided, we use the default; otherwise if default is "create", we create a communicator per collective op, but reuse the communicator op when actor set is the same. For all passed-in communicator, we check its actor set is the same as that of the communicator. - For "create", a single p2p communicator is created for all involved actors without a passed-in communicator. We reuse a previously created collective communicator if its actor set includes all p2p actors. - `_init_communicator()` is called for all communicators (passed in or created) on its involved actors, and `_destroy_communicator()` is only called for the ones created by Compiled Graph. By default, `experimental_compile(_default_communicator="create")` is used, therefore: - Any custom communicator passed in (via `with_tensor_transport()` for p2p, and `collective.allreduce.bind()` for collective) will take precedence and be used in the corresponding operations. These communicators won't be reused at other sites. (User should explicitly specify if they want.) - For each collective operation without a custom communicator, a new communicator is created, and this communicator is reused for all other collective-operation-without-custom-communicator that has the same set of actors. - For all p2p communications without custom communicator, a single communicator is created for all involved actors. Creation is skipped if there is a communicator created for collective operation, whose actors include all p2p actors: the collective communicator is reused in p2p communications in this case. This PR also refactors `CompiledDAG._preprocess()` so that code is more organized. Closes ray-project#47540 --------- Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
park12sj
pushed a commit
to park12sj/ray
that referenced
this pull request
Mar 18, 2025
…cator=comm) (ray-project#50023) Currently, `with_tensor_transport(transport=arg)` supports 3 types of arg: "nccl", "auto", or a `Communicator`. If "nccl" is used, or "auto" is used and resolves to using NCCL, Ray creates a communicator internally and uses it. In order to better manage the communicator lifecycle, we make the following changes: - Add API to support `experimental_compile(_default_communicator=default_comm)`, default_comm is a default communicator user provided - `_default_communicator` in the above API can be "create", which explicitly tells Ray to create a default communicator and use it for all `with_tensor_transport()` sites where a specific communicator was not provided - If `_default_communicator` is not specified, Ray will not create a default communicator or reuse communicators passed in from other `with_tensor_transport()` sites, but throw an error when a communicator is needed. Note that this is backward incompatible. - For collectives, if a custom communicator is specified at a specific site, we use it; otherwise, if default is provided, we use the default; otherwise if default is "create", we create a communicator per collective op, but reuse the communicator op when actor set is the same. For all passed-in communicator, we check its actor set is the same as that of the communicator. - For "create", a single p2p communicator is created for all involved actors without a passed-in communicator. We reuse a previously created collective communicator if its actor set includes all p2p actors. - `_init_communicator()` is called for all communicators (passed in or created) on its involved actors, and `_destroy_communicator()` is only called for the ones created by Compiled Graph. By default, `experimental_compile(_default_communicator="create")` is used, therefore: - Any custom communicator passed in (via `with_tensor_transport()` for p2p, and `collective.allreduce.bind()` for collective) will take precedence and be used in the corresponding operations. These communicators won't be reused at other sites. (User should explicitly specify if they want.) - For each collective operation without a custom communicator, a new communicator is created, and this communicator is reused for all other collective-operation-without-custom-communicator that has the same set of actors. - For all p2p communications without custom communicator, a single communicator is created for all involved actors. Creation is skipped if there is a communicator created for collective operation, whose actors include all p2p actors: the collective communicator is reused in p2p communications in this case. This PR also refactors `CompiledDAG._preprocess()` so that code is more organized. Closes ray-project#47540 --------- Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
8 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Why are these changes needed?
Currently,
with_tensor_transport(transport=arg)supports 3 types of arg: "nccl", "auto", or aCommunicator. If "nccl" is used, or "auto" is used and resolves to using NCCL, Ray creates a communicator internally and uses it.In order to better manage the communicator lifecycle, we make the following changes:
experimental_compile(_default_communicator=default_comm), default_comm is a default communicator user provided_default_communicatorin the above API can be "create", which explicitly tells Ray to create a default communicator and use it for allwith_tensor_transport()sites where a specific communicator was not provided_default_communicatoris not specified, Ray will not create a default communicator or reuse communicators passed in from otherwith_tensor_transport()sites, but throw an error when a communicator is needed. Note that this is backward incompatible._init_communicator()is called for all communicators (passed in or created) on its involved actors, and_destroy_communicator()is only called for the ones created by Compiled Graph.By default,
experimental_compile(_default_communicator="create")is used, therefore:with_tensor_transport()for p2p, andcollective.allreduce.bind()for collective) will take precedence and be used in the corresponding operations. These communicators won't be reused at other sites. (User should explicitly specify if they want.)This PR also refactors
CompiledDAG._preprocess()so that code is more organized.Related issue number
Closes #47540
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.