(WIP) [core][compiled graphs] Unify code paths for NCCL P2P and collectives scheduling#48649
(WIP) [core][compiled graphs] Unify code paths for NCCL P2P and collectives scheduling#48649AndyUB wants to merge 151 commits intoray-project:masterfrom
Conversation
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
|
Looks great. Some more TODOs before an initial review as we discussed offline:
cc @dengwxn |
|
@anyscalesam Could you help add a go badge to run more CI tests? Thanks! |
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
This reverts commit 941cb73. Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
After your attempt and a second thought, I think this might not be the best way to separate NCCL and non-NCCL ops by introducing another |
|
As we discussed offline, we should remove all the |
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
dengwxn
left a comment
There was a problem hiding this comment.
First pass. Structure seems right. Will look into details later.
stephanie-wang
left a comment
There was a problem hiding this comment.
I think this can be made simpler. Try to think about how you can achieve the following:
- _NCCLSendNode/_NCCLRecvNode should have the same interface as _CollectiveOperation
- If the above is done properly, I believe we can get rid of most of the parts that need to differentiate between send/recv/collective. I.e. there should be only one
requires_ncclflag instead of three, and there should only be on kind of DAG op node, aCOMPUTEnode.
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
…dule_gpu Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
|
@stephanie-wang @AndyUB do you want to continue working on this PR? |
Yes, we're still working on this. |
stephanie-wang
left a comment
There was a problem hiding this comment.
Sorry for the delay, I think this is looking close to merge-able.
I'm a bit confused about a few things, though:
- There are several different collective/p2p operation/node types added. Can you explain how each one is used, i.e. how do they reference each other and do we need all of them?
- Is there any change in scheduling behavior compared to before?
- Are there any unit tests that we can add? I.e. tests that don't need to create a full DAG and test the e2e execution.
|
|
||
| def __init__( | ||
| self, | ||
| method_args: Tuple[_P2PSendNode], |
There was a problem hiding this comment.
Why not use the same structure as CollectiveOutputNode, where we create one actual _P2PNode and the send and recv nodes depend on the _P2PNode, via other_args_to_resolve?
| # Convert the abstract P2P operation from scheduling to the executable P2P | ||
| # send/recv operation. | ||
| if self.requires_nccl_read: | ||
| assert self.nccl_ch is not None | ||
| self.nccl_op = _P2PRecvOperation(self.nccl_ch) | ||
| elif self.requires_nccl_write: | ||
| assert self.nccl_ch is not None | ||
| self.nccl_ch.ensure_registered_as_writer() | ||
| self.nccl_op = _P2PSendOperation(self.nccl_ch) |
There was a problem hiding this comment.
Why do we only need to do this conversion from abstract to executable operation for P2P operations and not for collective operations?
| if input_exc is not None and self.requires_nccl_write: | ||
| input_values = [input_exc] | ||
| input_exc = None |
There was a problem hiding this comment.
This code can be squashed into the following block.
python/ray/dag/compiled_dag_node.py
Outdated
| method_args=(node,), | ||
| other_args_to_resolve={ | ||
| PARENT_CLASS_NODE_KEY: send_actor_handle, | ||
| P2P_OPERATION_KEY: _P2POperation(), |
There was a problem hiding this comment.
Where does this get used?
| (3, _DAGNodeOperationType.COMPUTE), | ||
| (3, _DAGNodeOperationType.WRITE), | ||
| ] | ||
| w1_expected_schedule = [0, 1, 2, 5, 3, 4, 7, 6, 8] |
There was a problem hiding this comment.
Please add a comment explaining what the expected schedule is.
Also, I assume there was no behavior change in this test?
|
|
||
| @pytest.mark.skipif(not USE_GPU, reason="Skipping GPU Test") | ||
| @pytest.mark.parametrize("overlap_gpu_communication", [False, True]) | ||
| def test_torch_tensor_nccl_overlap_collective( |
There was a problem hiding this comment.
Please add comments explaining what each test does.
|
|
||
| @pytest.mark.skipif(not USE_GPU, reason="Skipping GPU Test") | ||
| @pytest.mark.parametrize("overlap_gpu_communication", [False, True]) | ||
| def test_torch_tensor_nccl_overlap_send_future_across_actors( |
There was a problem hiding this comment.
This test seems a bit complicated / unrelated compared to the stated goal? Is there a simpler test that can be run? Or a unit test?
|
|
||
| @pytest.mark.skipif(not USE_GPU, reason="Skipping GPU Test") | ||
| @pytest.mark.parametrize("overlap_gpu_communication", [False, True]) | ||
| def test_torch_tensor_nccl_overlap_same_future_multiple_waits( |
There was a problem hiding this comment.
This test seems a bit complicated / unrelated compared to the stated goal? Is there a simpler test that can be run? Or a unit test?
Signed-off-by: Yuhan Ruan <andyubryh@gmail.com>
…tions (#53007) Given an input DAG of SPMD training strategies such as DDP, after DAG compile, the first actor will generate different execution schedules than others. This is due to the current scheduling policy, when there are multiple ready operation nodes such as `actor1.compute` (non-NCCL) and `actor4.collective` (NCCL, for actor1-4, there's only one collective operation node that's eventually ready), the policy does not know actor1 has both the non-NCCL `actor1.compute` and the NCCL `actor4.collective`. This leads to actor1 scheduling the `actor1.compute` first, and actor1-4 scheduling the `collective` next. We update the policy to push all the collective operations nodes into candidates when the last of them is ready. In the previous example, actor1 will have both `actor1.compute` and `actor1.collective` as candidates. In a DAG of SPMD strategies, all the actors pop either the `compute` or the `collective` together. We also update the policy to simply prioritize the NCCL operation node over the non-NCCL. This will lead to NCCL operations to be scheduled as soon as possible. It is safe to do so under the current settings of CUDA streams in the system, because each NCCL read/write/collective stream only allows one outstanding NCCL kernel at a time. We add a test `test_collective_dag.py::test_exec_schedules_ddp` to verify the generated schedules are identical across workers for the DDP stragegy. Other tests are updated to reflect the changes of prioritizing the NCCL operation node over the non-NCCL. ## Related issue number <!-- For example: "Closes #1234" --> This PR is part of #48649 planning to be merged incrementally. --------- Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
…3111) This PR unifies the scheduling implementation for the NCCL P2P and collective operation nodes. The logic remains the same: (1) P2P case: When a NCCL send node is selected, its downstream NCCL recv nodes are also selected; (2) Collective case: When a NCCL collective node is selected, its corresponding NCCL collective nodes are also selected. Previously, the NCCL P2P case was implemented by selecting the recv nodes if a send node is detected, and the NCCL collective case was implemented by maintaining a set of pending collective nodes. We unify the implementation for both cases. Concretely, they both maintain a set of (pending) synchronous nodes named `sync_idxs` and `pending_sync_idxs`. The synchronous nodes denote the P2P send/recv nodes or the collective nodes. The NCCL P2P/collective operation is ready when `sync_idxs == pending_sync_idxs`. Test cases are updated to reflect the use of synchronous nodes for both NCCL P2P and collective nodes. This PR is a follow-up of #53007. They are parts of #48649 planning to be merged incrementally. --------- Signed-off-by: Weixin Deng <weixin@cs.washington.edu>
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
|
This pull request has been automatically closed because there has been no more activity in the 14 days Please feel free to reopen or open a new pull request if you'd still like this to be addressed. Again, you can always ask for help on our discussion forum or Ray's public slack channel. Thanks again for your contribution! |
Why are these changes needed?
This PR unifies the code paths for NCCL P2P and collectives. Before, scheduling for NCCL operations is done by splitting each node into three operations:
READ,COMPUTE, andWRITE. This PR simplifies the logic by only keeping the compute node. To ensure scheduling still works, NCCL operations are converted into special types of system-created compute nodes.This PR also allows overlapping NCCL collectives with computation.
NCCL P2P Refactoring
Before this PR, compiling this dag will result in a

TorchTensorNcclChannelfromfootobar.This PR adds a

NcclSendNodeafterfooand aNcclRecvNodebeforebar. TheTorchTensorNcclChannelnow connects the two added nodes. Sincefooand the send node are on the same actor, the channel fromfooto the send node is anIntraProcessChannel. Same thing for the recv side.Multiple Receivers
In this case, the sender sends to two different receivers.


Only one
NcclSendNodeis created. OneNcclRecvNodeis created per receiver. Like before, there is only 1TorchTensorNcclChannel.Multiple Senders
The receiver receives from two senders.


1
NcclSendNodeis created per sender. 1NcclRecvNodeis created per argument for the receiver. There are 2 differentTorchTensorNcclChannels.Overlap NCCL Collectives
This is done by prioritizing NCCL operations over non-NCCL operations when scheduling, i.e., if both some NCCL operations and some non-NCCL operations are ready to be added into the actors' execution schedules, NCCL operations are always added before the non-NCCL ones.
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.