Skip to content

Root-ish tasks all schedule onto one worker #6573

@gjoseph92

Description

@gjoseph92
import time
import dask
import distributed

client = distributed.Client(n_workers=4, threads_per_worker=1)

root = dask.delayed(lambda n: "x" * n)(dask.utils.parse_bytes("1MiB"), dask_key_name="root")
results = [dask.delayed(lambda *args: None)(root, i) for i in range(10000)]
dask.compute(results)

Initially a few results tasks run on other workers, but after about .5 sec, all tasks are just running on a single worker and the other three are idle.
Screen Shot 2022-06-13 at 8 08 48 PM

I would have expected these tasks to be evenly assigned to all workers up front

Some variables to play with:

  • If the size of the root task is smaller, tasks will be assigned to other workers
  • If you remove dask_key_name="root", then all tasks (including the root) will all run on the same worker. I assume this is because they have similar same key names (lambda) and therefore the same task group, and some scheduling heuristics are based not on graph structure but on naming heuristics

Distributed version: 2022.6.0

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions