Skip to content

[core][compiled graphs] Support experimental_compile(_default_communicator=comm)#50023

Merged
stephanie-wang merged 33 commits intoray-project:masterfrom
ruisearch42:explicit_comm2
Feb 5, 2025
Merged

[core][compiled graphs] Support experimental_compile(_default_communicator=comm)#50023
stephanie-wang merged 33 commits intoray-project:masterfrom
ruisearch42:explicit_comm2

Conversation

@ruisearch42
Copy link
Copy Markdown
Contributor

@ruisearch42 ruisearch42 commented Jan 23, 2025

Why are these changes needed?

Currently, with_tensor_transport(transport=arg) supports 3 types of arg: "nccl", "auto", or a Communicator. If "nccl" is used, or "auto" is used and resolves to using NCCL, Ray creates a communicator internally and uses it.

In order to better manage the communicator lifecycle, we make the following changes:

  • Add API to support experimental_compile(_default_communicator=default_comm), default_comm is a default communicator user provided
  • _default_communicator in the above API can be "create", which explicitly tells Ray to create a default communicator and use it for all with_tensor_transport() sites where a specific communicator was not provided
  • If _default_communicator is not specified, Ray will not create a default communicator or reuse communicators passed in from other with_tensor_transport() sites, but throw an error when a communicator is needed. Note that this is backward incompatible.
  • For collectives, if a custom communicator is specified at a specific site, we use it; otherwise, if default is provided, we use the default; otherwise if default is "create", we create a communicator per collective op, but reuse the communicator op when actor set is the same. For all passed-in communicator, we check its actor set is the same as that of the communicator.
  • For "create", a single p2p communicator is created for all involved actors without a passed-in communicator. We reuse a previously created collective communicator if its actor set includes all p2p actors.
  • _init_communicator() is called for all communicators (passed in or created) on its involved actors, and _destroy_communicator() is only called for the ones created by Compiled Graph.

By default, experimental_compile(_default_communicator="create") is used, therefore:

  • Any custom communicator passed in (via with_tensor_transport() for p2p, and collective.allreduce.bind() for collective) will take precedence and be used in the corresponding operations. These communicators won't be reused at other sites. (User should explicitly specify if they want.)
  • For each collective operation without a custom communicator, a new communicator is created, and this communicator is reused for all other collective-operation-without-custom-communicator that has the same set of actors.
  • For all p2p communications without custom communicator, a single communicator is created for all involved actors. Creation is skipped if there is a communicator created for collective operation, whose actors include all p2p actors: the collective communicator is reused in p2p communications in this case.

This PR also refactors CompiledDAG._preprocess() so that code is more organized.

Related issue number

Closes #47540

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
@ruisearch42 ruisearch42 changed the title wip [core][compiled graphs] Support Support experimental_compile(_default_communicator=comm) Jan 23, 2025
@ruisearch42 ruisearch42 marked this pull request as ready for review January 23, 2025 22:18
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
@ruisearch42 ruisearch42 added the go add ONLY when ready to merge, run all tests label Jan 24, 2025
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
@ruisearch42 ruisearch42 changed the title [core][compiled graphs] Support Support experimental_compile(_default_communicator=comm) [core][compiled graphs] Support experimental_compile(_default_communicator=comm) Jan 30, 2025
@ruisearch42 ruisearch42 removed the go add ONLY when ready to merge, run all tests label Jan 30, 2025
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
@ruisearch42 ruisearch42 added the go add ONLY when ready to merge, run all tests label Jan 31, 2025
Copy link
Copy Markdown
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall looks good

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Copy link
Copy Markdown
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave some comments. Overall looks good to me. Because this is an API change, it'd be helpful to get a review from @stephanie-wang.

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
@stephanie-wang stephanie-wang merged commit a0631d7 into ray-project:master Feb 5, 2025
2 checks passed
xsuler pushed a commit to antgroup/ant-ray that referenced this pull request Mar 4, 2025
…cator=comm) (ray-project#50023)

Currently, `with_tensor_transport(transport=arg)` supports 3 types of
arg: "nccl", "auto", or a `Communicator`. If "nccl" is used, or "auto"
is used and resolves to using NCCL, Ray creates a communicator
internally and uses it.

In order to better manage the communicator lifecycle, we make the
following changes:

- Add API to support
`experimental_compile(_default_communicator=default_comm)`, default_comm
is a default communicator user provided
- `_default_communicator` in the above API can be "create", which
explicitly tells Ray to create a default communicator and use it for all
`with_tensor_transport()` sites where a specific communicator was not
provided
- If `_default_communicator` is not specified, Ray will not create a
default communicator or reuse communicators passed in from other
`with_tensor_transport()` sites, but throw an error when a communicator
is needed. Note that this is backward incompatible.
- For collectives, if a custom communicator is specified at a specific
site, we use it; otherwise, if default is provided, we use the default;
otherwise if default is "create", we create a communicator per
collective op, but reuse the communicator op when actor set is the same.
For all passed-in communicator, we check its actor set is the same as
that of the communicator.
- For "create", a single p2p communicator is created for all involved
actors without a passed-in communicator. We reuse a previously created
collective communicator if its actor set includes all p2p actors.
- `_init_communicator()` is called for all communicators (passed in or
created) on its involved actors, and `_destroy_communicator()` is only
called for the ones created by Compiled Graph.

By default, `experimental_compile(_default_communicator="create")` is
used, therefore:
- Any custom communicator passed in (via `with_tensor_transport()` for
p2p, and `collective.allreduce.bind()` for collective) will take
precedence and be used in the corresponding operations. These
communicators won't be reused at other sites. (User should explicitly
specify if they want.)
- For each collective operation without a custom communicator, a new
communicator is created, and this communicator is reused for all other
collective-operation-without-custom-communicator that has the same set
of actors.
- For all p2p communications without custom communicator, a single
communicator is created for all involved actors. Creation is skipped if
there is a communicator created for collective operation, whose actors
include all p2p actors: the collective communicator is reused in p2p
communications in this case.

This PR also refactors `CompiledDAG._preprocess()` so that code is more
organized.

Closes ray-project#47540


---------

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
xsuler pushed a commit to antgroup/ant-ray that referenced this pull request Mar 4, 2025
…cator=comm) (ray-project#50023)

Currently, `with_tensor_transport(transport=arg)` supports 3 types of
arg: "nccl", "auto", or a `Communicator`. If "nccl" is used, or "auto"
is used and resolves to using NCCL, Ray creates a communicator
internally and uses it.

In order to better manage the communicator lifecycle, we make the
following changes:

- Add API to support
`experimental_compile(_default_communicator=default_comm)`, default_comm
is a default communicator user provided
- `_default_communicator` in the above API can be "create", which
explicitly tells Ray to create a default communicator and use it for all
`with_tensor_transport()` sites where a specific communicator was not
provided
- If `_default_communicator` is not specified, Ray will not create a
default communicator or reuse communicators passed in from other
`with_tensor_transport()` sites, but throw an error when a communicator
is needed. Note that this is backward incompatible.
- For collectives, if a custom communicator is specified at a specific
site, we use it; otherwise, if default is provided, we use the default;
otherwise if default is "create", we create a communicator per
collective op, but reuse the communicator op when actor set is the same.
For all passed-in communicator, we check its actor set is the same as
that of the communicator.
- For "create", a single p2p communicator is created for all involved
actors without a passed-in communicator. We reuse a previously created
collective communicator if its actor set includes all p2p actors.
- `_init_communicator()` is called for all communicators (passed in or
created) on its involved actors, and `_destroy_communicator()` is only
called for the ones created by Compiled Graph.

By default, `experimental_compile(_default_communicator="create")` is
used, therefore:
- Any custom communicator passed in (via `with_tensor_transport()` for
p2p, and `collective.allreduce.bind()` for collective) will take
precedence and be used in the corresponding operations. These
communicators won't be reused at other sites. (User should explicitly
specify if they want.)
- For each collective operation without a custom communicator, a new
communicator is created, and this communicator is reused for all other
collective-operation-without-custom-communicator that has the same set
of actors.
- For all p2p communications without custom communicator, a single
communicator is created for all involved actors. Creation is skipped if
there is a communicator created for collective operation, whose actors
include all p2p actors: the collective communicator is reused in p2p
communications in this case.

This PR also refactors `CompiledDAG._preprocess()` so that code is more
organized.

Closes ray-project#47540


---------

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
park12sj pushed a commit to park12sj/ray that referenced this pull request Mar 18, 2025
…cator=comm) (ray-project#50023)

Currently, `with_tensor_transport(transport=arg)` supports 3 types of
arg: "nccl", "auto", or a `Communicator`. If "nccl" is used, or "auto"
is used and resolves to using NCCL, Ray creates a communicator
internally and uses it.

In order to better manage the communicator lifecycle, we make the
following changes:

- Add API to support
`experimental_compile(_default_communicator=default_comm)`, default_comm
is a default communicator user provided
- `_default_communicator` in the above API can be "create", which
explicitly tells Ray to create a default communicator and use it for all
`with_tensor_transport()` sites where a specific communicator was not
provided
- If `_default_communicator` is not specified, Ray will not create a
default communicator or reuse communicators passed in from other
`with_tensor_transport()` sites, but throw an error when a communicator
is needed. Note that this is backward incompatible.
- For collectives, if a custom communicator is specified at a specific
site, we use it; otherwise, if default is provided, we use the default;
otherwise if default is "create", we create a communicator per
collective op, but reuse the communicator op when actor set is the same.
For all passed-in communicator, we check its actor set is the same as
that of the communicator.
- For "create", a single p2p communicator is created for all involved
actors without a passed-in communicator. We reuse a previously created
collective communicator if its actor set includes all p2p actors.
- `_init_communicator()` is called for all communicators (passed in or
created) on its involved actors, and `_destroy_communicator()` is only
called for the ones created by Compiled Graph.

By default, `experimental_compile(_default_communicator="create")` is
used, therefore:
- Any custom communicator passed in (via `with_tensor_transport()` for
p2p, and `collective.allreduce.bind()` for collective) will take
precedence and be used in the corresponding operations. These
communicators won't be reused at other sites. (User should explicitly
specify if they want.)
- For each collective operation without a custom communicator, a new
communicator is created, and this communicator is reused for all other
collective-operation-without-custom-communicator that has the same set
of actors.
- For all p2p communications without custom communicator, a single
communicator is created for all involved actors. Creation is skipped if
there is a communicator created for collective operation, whose actors
include all p2p actors: the collective communicator is reused in p2p
communications in this case.

This PR also refactors `CompiledDAG._preprocess()` so that code is more
organized.

Closes ray-project#47540


---------

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-backlog go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[aDAG] Support experimental_compile(_custom_nccl_group= nccl_group) API

4 participants