Skip to content

Worker <-> Worker Communication Failures bring Cluster in inconsistent State #5951

@nils-braun

Description

@nils-braun

Due to network issues, overloaded workers or just bad luck, the worker-to-worker communication needed for getting task dependencies from other workers might fail (distributed.worker.get_data_from_worker). This is (in principle) successfully caught by the worker and scheduler and reacted on. During this however a race-condition can be triggered, which brings the cluster in an inconsistent and stuck state.

Minimal Complete Verifiable Example:

The following code will introduce a random failure in distributed.worker.get_data_from_worker with higher probability than a real-world use-case might have, just to demonstrate:

from unittest.mock import patch
from random import randint

import dask.bag as db
from distributed import Client

# Calling this function on the worker will temporarily replace
# the `get_data_from_worker` function with a version, which
# fails in 1 out of 4 cases. In the "real world" the 
# probability of a failure due to communication problems is
# of course a lot smaller, but it might happen.
def replace_get_data_from_worker_function(dask_worker):
    from distributed.worker import get_data_from_worker
    
    dask_worker.my_counter = 0

    async def get_data_from_worker_function(*args, **kwargs):
        # "Fail" in 1 out of 4 cases
        if randint(0, 3) == 1:
            dask_worker.my_counter += 1
            raise OSError

        return await get_data_from_worker(*args, **kwargs)
            
    p = patch("distributed.worker.get_data_from_worker", 
              get_data_from_worker_function)
    p.start()

if __name__ == "__main__":
    client = Client()
    client.run(replace_get_data_from_worker_function)

    PARTITIONS = 20

    # Just some arbitrary computation, which takes
    # reasonably long and (most importantly) creates tasks
    # with many dependencies, so that we need a lot of communication
    data = db.from_sequence(range(PARTITIONS), npartitions=PARTITIONS)\
             .map_partitions(lambda x: range(100_000))\
             .groupby(lambda x: x % PARTITIONS)\
             .map_partitions(lambda x: len(list(x)))
    data.compute()

Running this code, will sometimes not finish the computation, but one (or multiple) workers are stuck while waiting in fetching the dependencies for a specific task.
Note: it is a race-condition, so you might need to run the code multiple times until it is stuck...

Here are some observations I have made using the worker's and scheduler's properties and the transition log.
Let's say worker A is stuck while processing task T1, which depends on task T2 owned by worker B.

  • the scheduler task state shows that T2 is present on B
  • B has the task in its memory (dask_worker.data)
  • A however thinks, that no-one owns the task dask_worker.tasks[T1].who_owns == {}, so A does not even start asking B for the data.

From the transition log, this is what I think that happens (but I am happy if someone with more knowledge on the way the worker works could confirm my observations):

  • some time before that, worker A wants to process another task, T3, which needs a dependency T4 also from B
  • it calls gather_dep, which calls get_data_from_worker. This fails (either due to a real network issue or due to our patched function above).
  • in the meantime/around this time, the scheduler also tells A to do T1, which depends on T2 (owned by B).
  • during the error handling for an OSError of gather_dep, the local state of the worker A is changed in a way, so that all tasks owned by B are marked as not owned by B anymore. In our case, that is T2 and T4.
  • However (and I think this is where the bug is), only the initially requested dependencies are notified as missing to the scheduler, in this case T4. (see here)

The final state is, that the worker A thinks no one would own the data for T2, while the scheduler will not re-distributed the task (as it was never marked as missing).

One last comment: using the setting DASK_DISTRIBUTED__COMM__RETRY__COUNT it is possible to make the failures
of the get_data_from_worker function less likely. But unfortunately, this will just decrease the probability, not fix the problem.

Environment:

  • Dask version: 2022.2.1
  • Python version: 3.8
  • Operating System: macOS
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething is brokendeadlockThe cluster appears to not make any progressstabilityIssue or feature related to cluster stability (e.g. deadlock)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions