-
-
Notifications
You must be signed in to change notification settings - Fork 757
Closed
Description
Related to #2142, but the solution doesn't apply in my case. I have a use case for workers running in separate processes, but as non-daemons because the worker processes need to use multiprocessing. Here's an example:
import torch
import torch.distributed as dist
import torchvision
import os
from distributed import Client, LocalCluster
def worker_fn(rank, world_size):
print('worker', rank)
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '8989'
dist.init_process_group(
backend=dist.Backend.NCCL,
rank=rank,
world_size=world_size,
)
print('initialized distributed', rank)
if rank == 0:
dataset = torchvision.datasets.MNIST(
'../data/',
train=True,
download=True,
)
dist.barrier()
if rank != 0:
dataset = torchvision.datasets.MNIST(
'../data/',
train=True,
download=False,
)
# load data, uses multiprocessing
loader = torch.utils.data.DataLoader(
dataset,
sampler=torch.utils.data.distributed.DistributedSampler(
dataset,
rank=rank,
num_replicas=world_size,
),
num_workers=2,
)
print('constructed data loader', rank)
# if cuda is available, initializes it as well
assert torch.cuda.is_available()
# do distributed training, but in this case it suffices to iterate
for x, y in loader:
pass
def main():
world_size = 2
cluster = LocalCluster(
n_workers=world_size,
processes=True,
resources={
'GPUS': 1, # don't allow two tasks to run on the same worker
},
)
cl = Client(cluster)
futs = []
for rank in range(world_size):
futs.append(
cl.submit(
worker_fn,
rank,
world_size,
resources={'GPUS': 1},
))
for f in futs:
f.result()
if __name__ == '__main__':
main()If processes=True, then we get an error about daemonic processes not being allowed to have children:
worker 0
worker 1
initialized distributed 1
initialized distributed 0
constructed data loader 0
constructed data loader 1
distributed.worker - WARNING - Compute Failed
Function: worker_fn
args: (0, 2)
kwargs: {}
Exception: AssertionError('daemonic processes are not allowed to have children',)
Traceback (most recent call last):
File "scratch.py", line 152, in <module>
main()
File "scratch.py", line 148, in main
f.result()
File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/distributed/client.py", line 227, in result
six.reraise(*result)
File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/six.py", line 692, in reraise
raise value.with_traceback(tb)
File "scratch.py", line 123, in worker_fn
for x, y in loader:
File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 193, in __iter__
return _DataLoaderIter(self)
File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 469, in __init__
w.start()
File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/multiprocessing/process.py", line 103, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
distributed.worker - WARNING - Compute Failed
Function: worker_fn
args: (1, 2)
kwargs: {}
Exception: AssertionError('daemonic processes are not allowed to have children',)
If processes=False, we get stuck at distributed initialization.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels