@@ -422,7 +422,7 @@ def test_allgather_ops(self):
422422
423423 def allgather (output_ts , input_ts ):
424424 work = pg .allgather (output_ts , input_ts )
425- work .wait ()
425+ return work .wait ()
426426
427427 tensors = [torch .empty (2 , 2 ).fill_ (2 ).cuda (device = i ) for i in local_device_ids ]
428428 output_tensors = []
@@ -435,7 +435,7 @@ def allgather(output_ts, input_ts):
435435 output_tensors .append ([t .cuda (device = gpu ) for t in output_per_gpu ])
436436 expected_output .append ([t .cuda (device = gpu ) for t in expected_per_gpu ])
437437
438- allgather (output_tensors , tensors )
438+ result = allgather (output_tensors , tensors )
439439
440440 # Verification
441441 self .assertEqual (output_tensors , expected_output )
@@ -495,6 +495,140 @@ def allgather_base(output_t, input_t):
495495 # fails the check because the dtype is different
496496 allgather_base (output_t , tensor )
497497
498+ @requires_nccl ()
499+ @sandcastle_skip_if (torch .cuda .device_count () < 2 , "NCCL test requires 2+ GPUs" )
500+ def test_gather_ops (self ):
501+ store = c10d .FileStore (self .file_name , self .world_size )
502+ pg = self ._create_process_group_nccl (store , self .opts ())
503+ local_device_ids = self .rank_to_GPU [self .rank ]
504+ num_gpus = len (local_device_ids )
505+
506+ def gather (output_t , input_t , rootRank ):
507+ opts = c10d .GatherOptions ()
508+ opts .rootRank = rootRank
509+ if rootRank == self .rank :
510+ work = pg .gather (output_t , input_t , opts )
511+ else :
512+ work = pg .gather ([], input_t , opts )
513+ work .wait ()
514+
515+ # init input
516+ tensors = []
517+ for device_id in local_device_ids :
518+ tensors .append (torch .tensor ([self .rank ]).cuda (device_id ))
519+
520+ # init output
521+ output_ts = []
522+ for idx in range (num_gpus ):
523+ gpu_idx = local_device_ids [idx ]
524+ output_ts .append ([])
525+ for rank in range (self .world_size ):
526+ output_ts [idx ].append (torch .tensor ([- 1 ]).cuda (gpu_idx ))
527+
528+ expected = [[torch .tensor ([rank ]) for rank in range (self .world_size )]]
529+ for rank in range (self .world_size ):
530+ gather (output_ts , tensors , rank )
531+ if rank == self .rank :
532+ self .assertEqual (expected , output_ts )
533+
534+ @requires_nccl ()
535+ @sandcastle_skip_if (torch .cuda .device_count () < 2 , "NCCL test requires 2+ GPUs" )
536+ def test_gather_stress (self ):
537+ store = c10d .FileStore (self .file_name , self .world_size )
538+ pg = self ._create_process_group_nccl (store , self .opts ())
539+ local_device_ids = self .rank_to_GPU [self .rank ]
540+ num_gpus = len (local_device_ids )
541+
542+ def gather (output_t , input_t , rootRank ):
543+ opts = c10d .GatherOptions ()
544+ opts .rootRank = rootRank
545+ if rootRank == self .rank :
546+ work = pg .gather (output_t , input_t , opts )
547+ else :
548+ work = pg .gather ([], input_t , opts )
549+ work .wait ()
550+
551+ stress_length = 1000
552+
553+ # init input
554+ tensors = []
555+ for i in range (stress_length ):
556+ tensors .append ([])
557+ for device_id in local_device_ids :
558+ tensors [i ].append (torch .tensor ([self .rank ]).cuda (device_id ))
559+
560+ # init output
561+ output_ts = []
562+ for i in range (stress_length ):
563+ output_ts .append ([[] for _ in range (num_gpus )])
564+ for idx , ls in enumerate (output_ts [i ]):
565+ gpu_idx = local_device_ids [idx ]
566+ for _ in range (self .world_size ):
567+ ls .append (torch .tensor ([- 1 ]).cuda (gpu_idx ))
568+
569+ expected = [[torch .tensor ([rank ]) for rank in range (self .world_size )]]
570+ for i in range (stress_length ):
571+ for rank in range (self .world_size ):
572+ gather (output_ts [i ], tensors [i ], rank )
573+ # Verification
574+ if rank == self .rank :
575+ self .assertEqual (output_ts [i ], expected )
576+
577+ @requires_nccl ()
578+ @sandcastle_skip_if (torch .cuda .device_count () < 2 , "NCCL test requires 2+ GPUs" )
579+ def test_gather_checks (self ):
580+ store = c10d .FileStore (self .file_name , self .world_size )
581+ pg = self ._create_process_group_nccl (store , self .opts ())
582+ local_device_ids = self .rank_to_GPU [self .rank ]
583+ num_gpus = len (local_device_ids )
584+
585+ # init input
586+ tensors = []
587+ for device_id in local_device_ids :
588+ tensors .append (torch .tensor ([self .rank ]).cuda (device_id ))
589+
590+ # init output
591+ output_ts = []
592+ for idx in range (num_gpus ):
593+ gpu_idx = local_device_ids [idx ]
594+ output_ts .append ([])
595+ for rank in range (self .world_size ):
596+ output_ts [idx ].append (torch .tensor ([- 1 ]).cuda (gpu_idx ))
597+
598+ with self .assertRaisesRegex (RuntimeError , "invalid root rank" ):
599+ opts = c10d .GatherOptions ()
600+ opts .rootRank = - 1
601+ pg .gather (output_ts , tensors , opts )
602+
603+ with self .assertRaisesRegex (TypeError , "incompatible function arguments" ):
604+ pg .gather (output_ts , tensors , 0 )
605+
606+ with self .assertRaisesRegex (RuntimeError , "invalid root rank" ):
607+ opts = c10d .GatherOptions ()
608+ opts .rootRank = self .world_size
609+ pg .gather (output_ts , tensors , opts )
610+
611+ with self .assertRaisesRegex (
612+ RuntimeError , "Tensor list must be nonempty"
613+ ):
614+ opts = c10d .GatherOptions ()
615+ opts .rootRank = 0
616+ pg .gather (output_ts , [], opts )
617+
618+ with self .assertRaisesRegex (
619+ RuntimeError , "Tensors must be on distinct GPU devices"
620+ ):
621+ # init input
622+ tensors2 = []
623+ for device_id in local_device_ids :
624+ tensors2 .append (torch .tensor ([self .rank ]).cuda (device_id ))
625+ tensors2 .append (torch .tensor ([self .rank ]).cuda (device_id ))
626+
627+ opts = c10d .GatherOptions ()
628+ opts .rootRank = 0
629+ pg .gather (output_ts , tensors2 , opts )
630+
631+
498632 @requires_nccl ()
499633 @sandcastle_skip_if (torch .cuda .device_count () < 2 , "NCCL test requires 2+ GPUs" )
500634 def test_reduce_scatter_base_basics (self ):
0 commit comments