-
-
Notifications
You must be signed in to change notification settings - Fork 756
Description
Due to network issues, overloaded workers or just bad luck, the worker-to-worker communication needed for getting task dependencies from other workers might fail (distributed.worker.get_data_from_worker). This is (in principle) successfully caught by the worker and scheduler and reacted on. During this however a race-condition can be triggered, which brings the cluster in an inconsistent and stuck state.
Minimal Complete Verifiable Example:
The following code will introduce a random failure in distributed.worker.get_data_from_worker with higher probability than a real-world use-case might have, just to demonstrate:
from unittest.mock import patch
from random import randint
import dask.bag as db
from distributed import Client
# Calling this function on the worker will temporarily replace
# the `get_data_from_worker` function with a version, which
# fails in 1 out of 4 cases. In the "real world" the
# probability of a failure due to communication problems is
# of course a lot smaller, but it might happen.
def replace_get_data_from_worker_function(dask_worker):
from distributed.worker import get_data_from_worker
dask_worker.my_counter = 0
async def get_data_from_worker_function(*args, **kwargs):
# "Fail" in 1 out of 4 cases
if randint(0, 3) == 1:
dask_worker.my_counter += 1
raise OSError
return await get_data_from_worker(*args, **kwargs)
p = patch("distributed.worker.get_data_from_worker",
get_data_from_worker_function)
p.start()
if __name__ == "__main__":
client = Client()
client.run(replace_get_data_from_worker_function)
PARTITIONS = 20
# Just some arbitrary computation, which takes
# reasonably long and (most importantly) creates tasks
# with many dependencies, so that we need a lot of communication
data = db.from_sequence(range(PARTITIONS), npartitions=PARTITIONS)\
.map_partitions(lambda x: range(100_000))\
.groupby(lambda x: x % PARTITIONS)\
.map_partitions(lambda x: len(list(x)))
data.compute()Running this code, will sometimes not finish the computation, but one (or multiple) workers are stuck while waiting in fetching the dependencies for a specific task.
Note: it is a race-condition, so you might need to run the code multiple times until it is stuck...
Here are some observations I have made using the worker's and scheduler's properties and the transition log.
Let's say worker A is stuck while processing task T1, which depends on task T2 owned by worker B.
- the scheduler task state shows that
T2is present onB Bhas the task in its memory (dask_worker.data)Ahowever thinks, that no-one owns the taskdask_worker.tasks[T1].who_owns == {}, soAdoes not even start askingBfor the data.
From the transition log, this is what I think that happens (but I am happy if someone with more knowledge on the way the worker works could confirm my observations):
- some time before that, worker
Awants to process another task,T3, which needs a dependencyT4also fromB - it calls
gather_dep, which callsget_data_from_worker. This fails (either due to a real network issue or due to our patched function above). - in the meantime/around this time, the scheduler also tells
Ato doT1, which depends onT2(owned byB). - during the error handling for an
OSErrorofgather_dep, the local state of the workerAis changed in a way, so that all tasks owned byBare marked as not owned byBanymore. In our case, that isT2andT4. - However (and I think this is where the bug is), only the initially requested dependencies are notified as missing to the scheduler, in this case
T4. (see here)
The final state is, that the worker A thinks no one would own the data for T2, while the scheduler will not re-distributed the task (as it was never marked as missing).
One last comment: using the setting DASK_DISTRIBUTED__COMM__RETRY__COUNT it is possible to make the failures
of the get_data_from_worker function less likely. But unfortunately, this will just decrease the probability, not fix the problem.
Environment:
- Dask version: 2022.2.1
- Python version: 3.8
- Operating System: macOS
- Install method (conda, pip, source): pip