Skip to content

Non-daemonic workers #2718

@calebho

Description

@calebho

Related to #2142, but the solution doesn't apply in my case. I have a use case for workers running in separate processes, but as non-daemons because the worker processes need to use multiprocessing. Here's an example:

import torch
import torch.distributed as dist
import torchvision
import os
from distributed import Client, LocalCluster


def worker_fn(rank, world_size):
    print('worker', rank)

    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '8989'
    dist.init_process_group(
        backend=dist.Backend.NCCL,
        rank=rank,
        world_size=world_size,
    )
    print('initialized distributed', rank)

    if rank == 0:
        dataset = torchvision.datasets.MNIST(
            '../data/',
            train=True,
            download=True,
        )
    dist.barrier()
    if rank != 0:
        dataset = torchvision.datasets.MNIST(
            '../data/',
            train=True,
            download=False,
        )
    # load data, uses multiprocessing
    loader = torch.utils.data.DataLoader(
        dataset,
        sampler=torch.utils.data.distributed.DistributedSampler(
            dataset,
            rank=rank,
            num_replicas=world_size,
        ),
        num_workers=2,
    )
    print('constructed data loader', rank)

    # if cuda is available, initializes it as well
    assert torch.cuda.is_available()
    # do distributed training, but in this case it suffices to iterate
    for x, y in loader:
        pass


def main():
    world_size = 2
    cluster = LocalCluster(
        n_workers=world_size,
        processes=True,
        resources={
            'GPUS': 1,  # don't allow two tasks to run on the same worker
        },
    )
    cl = Client(cluster)
    futs = []
    for rank in range(world_size):
        futs.append(
            cl.submit(
                worker_fn,
                rank,
                world_size,
                resources={'GPUS': 1},
            ))

    for f in futs:
        f.result()


if __name__ == '__main__':
    main()

If processes=True, then we get an error about daemonic processes not being allowed to have children:

worker 0
worker 1
initialized distributed 1
initialized distributed 0
constructed data loader 0
constructed data loader 1
distributed.worker - WARNING -  Compute Failed
Function:  worker_fn
args:      (0, 2)
kwargs:    {}
Exception: AssertionError('daemonic processes are not allowed to have children',)

Traceback (most recent call last):
  File "scratch.py", line 152, in <module>
    main()
  File "scratch.py", line 148, in main
    f.result()
  File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/distributed/client.py", line 227, in result
    six.reraise(*result)
  File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "scratch.py", line 123, in worker_fn
    for x, y in loader:
  File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 193, in __iter__
    return _DataLoaderIter(self)
  File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/site-packages/torch/utils/data/dataloader.py", line 469, in __init__
    w.start()
  File "/private/home/calebh/miniconda3/envs/fairtask2/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
distributed.worker - WARNING -  Compute Failed
Function:  worker_fn
args:      (1, 2)
kwargs:    {}
Exception: AssertionError('daemonic processes are not allowed to have children',)

If processes=False, we get stuck at distributed initialization.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions