@@ -981,6 +981,8 @@ ProcessGroupNCCL::ProcessGroupNCCL(
981981 TORCH_WARN_ONCE (
982982 " TORCH_NCCL_AVOID_RECORD_STREAMS is the default now, this environment variable is thus deprecated." );
983983 }
984+ showSerializationWarning_ =
985+ getCvarBool (TORCH_NCCL_SHOW_EAGER_INIT_P2P_SERIALIZATION_WARNING, true );
984986
985987 if (blockingWait_) {
986988 LOG (INFO)
@@ -1075,6 +1077,7 @@ void ProcessGroupNCCL::eagerConnectSingleDevice(at::Device device) {
10751077 LOG (INFO) << logPrefix () << " Eagerly connecting nccl backend with device "
10761078 << device;
10771079 initNCCLComm (key, device, OpType::ALLREDUCE);
1080+ eagerInit_ = true ;
10781081}
10791082
10801083bool ProcessGroupNCCL::useNonblocking () {
@@ -3957,23 +3960,72 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::pointToPoint(
39573960 at::cuda::OptionalCUDAGuard gpuGuard (device);
39583961
39593962 std::string key;
3960- int p2pRank = 0 , p2pTargetRank = 0 ;
3961- bool isSendRecvSelf = false ;
3963+ int p2pRank = - 1 , p2pTargetRank = - 1 ;
3964+ bool isSendRecvSelf = rank_ == peer ;
39623965 // For batch_isend_irecv, ncclGroupStart() would be called upfront
39633966 bool batchP2P = ncclActiveGroupCounter_ > 0 ;
3964- if (batchP2P) {
3965- // For batch P2P, we need to treat it like a collective when selecting
3966- // communicator, because other ranks can call into this batch other than my
3967- // rank and my peer
3967+
3968+ std::shared_ptr<NCCLComm> ncclComm = nullptr ;
3969+ if (this ->eagerInit_ ) {
3970+ /* In eagerInit mode, reuse the parent comm. Do not lazily create
3971+ * p2p communicators. */
3972+ if (!batchP2P && showSerializationWarning_) {
3973+ TORCH_WARN_ONCE (c10::str (
3974+ logPrefix (),
3975+ " An unbatched P2P op (send/recv) was called on this ProcessGroup with size " ,
3976+ groupRanks ().size (),
3977+ " . In eager initialization mode, unbatched P2P ops are treated as " ,
3978+ " independent collective ops, and are thus serialized with " ,
3979+ " all other ops on this ProcessGroup, including other P2P " ,
3980+ " ops. To avoid serialization, either create additional " ,
3981+ " independent ProcessGroups for the P2P ops or use batched " ,
3982+ " P2P ops. You can squash this warning by setting the environment variable " ,
3983+ " TORCH_NCCL_SHOW_EAGER_INIT_P2P_SERIALIZATION_WARNING to false." ));
3984+ }
3985+
39683986 key = getKeyFromDevice (device);
39693987 p2pRank = rank_;
39703988 p2pTargetRank = peer;
3989+ ncclComm = getNCCLComm (key);
3990+
3991+ TORCH_INTERNAL_ASSERT (
3992+ ncclComm != nullptr ,
3993+ " Parent communicator missing in eager initialization mode." );
3994+
3995+ if (!coalescing_state_) {
3996+ // Bump P2P sequence number. Don't do so if it's a batch P2P, it will be
3997+ // bumped in `startCoalescing`.
3998+ seqP2P_++;
3999+ }
4000+ } else if (batchP2P) {
4001+ // TODO(whc) - unclear why we special-case batchP2P to avoid this path, but
4002+ // I preserved this existing special case.
4003+ key = getKeyFromDevice (device);
4004+ p2pRank = rank_;
4005+ p2pTargetRank = peer;
4006+ ncclComm = getNCCLComm (key);
39714007 } else {
3972- // For single P2P, preserve the old two-rank behavior (to avoid perf diff)
4008+ // We create special 2-rank communicators for each pair of
4009+ // send/recv ranks. This limitation exists for two reasons: (1)
4010+ // we use a single stream per communicator, so if multiple
4011+ // unbatched p2p operations are issued on the same communicator,
4012+ // they would map to the same stream and thus would be serialized;
4013+ // and (2) Nvidia NCCL does not allow multiple p2p operations to
4014+ // be issued on the same communicator over different streams.
4015+
4016+ TORCH_WARN_ONCE (
4017+ " An unbatched P2P op (send/recv) was called on this " ,
4018+ " ProcessGroup with size " ,
4019+ groupRanks ().size (),
4020+ " . In lazy initialization mode, this will result in a new 2-rank" ,
4021+ " NCCL communicator to be created." );
4022+
39734023 key = getKeySendRecv (rank_, peer);
4024+ /* if we are creating a new comm, reset the p2pRank and
4025+ * p2pTargetRank to correspond to this new 2-process communicator */
39744026 p2pRank = rank_ <= peer ? 0 : 1 ;
3975- isSendRecvSelf = rank_ == peer;
39764027 p2pTargetRank = isSendRecvSelf ? 0 : 1 - p2pRank;
4028+ ncclComm = getNCCLComm (key);
39774029
39784030 if (!coalescing_state_) {
39794031 // Bump P2P sequence number.
@@ -3985,9 +4037,13 @@ c10::intrusive_ptr<Work> ProcessGroupNCCL::pointToPoint(
39854037 // coalesced or individual
39864038 op_id_++;
39874039
3988- std::shared_ptr<NCCLComm> ncclComm = getNCCLComm (key);
39894040 if (ncclComm == nullptr ) {
3990- ncclComm = initNCCLComm (key, device, opType, p2pRank, isSendRecvSelf);
4041+ // ncclComm should never be a nullptr in eager init mode.
4042+ // For lazy init mode, isSendRecvSelf is only valid for non-batch
4043+ // point-to-point operations. For batch operations, force the
4044+ // argument to be false.
4045+ ncclComm =
4046+ initNCCLComm (key, device, opType, p2pRank, isSendRecvSelf && !batchP2P);
39914047 }
39924048
39934049 if (coalescing_state_ & CoalActive) {
0 commit comments