Skip to content

Deadlock: subsequent computations deadlock after a task has errored #5497

@gjoseph92

Description

@gjoseph92

Minimal Complete Verifiable Example:

from dask.datasets import timeseries
from dask.distributed import Client, LocalCluster


def raise_exc(part):
    raise RuntimeError


if __name__ == '__main__':
    cluster = LocalCluster(n_workers=2, threads_per_worker=2)
    with Client(cluster) as client:
        ts = timeseries()
        ts = ts.map_partitions(raise_exc, meta={})
        print("starting first compute on cluster")
        try:
            ts.compute()
        except:
            pass
        print("first compute done")

        print("starting second compute on cluster")
        X = client.submit(lambda x: x+1, 1).result()
        print("second compute done")
        print(X)

second compute done never prints on distributed main, 2021.10.0. It succeeds on 2021.09.1. (Note this was all with dask/dask on main dask/dask@c2278fe).

Originally reported in lightgbm-org/LightGBM#4771 (comment) (thank you @jmoralez and @jameslamb!). I've slightly simplified the example (the multiple clients weren't necessary to reproduce, nor recomputing timeseries—it seems that any subsequent computation deadlocks). I haven't been able to simplify timeseries().map_partitions(raise_exc); just submitting Futures I can't replicate it.

Anything else we need to know?:

I poked briefly at the deadlocked cluster from another client. This is not #5480; the workers' BatchedSends have nothing in their buffers. Additionally, the scheduler and worker both actually agree that the workers have more tasks remaining to do (client.processing() lines up with client.run(lambda dask_worker: dask_worker.ready)).

This seems to be purely a Worker state machine deadlock. The problem is that there are keys left in Worker._executing:

In [14]: client.run(lambda dask_worker: dask_worker._executing)
Out[14]: 
{'tcp://127.0.0.1:53296': {<Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 10)" error>,
  <Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 11)" error>},
 'tcp://127.0.0.1:53297': {<Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 24)" error>,
  <Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 25)" error>}}

But threads aren't actually running:

In [13]: client.run(lambda dask_worker: dask_worker.active_threads)
Out[13]: {'tcp://127.0.0.1:53296': {}, 'tcp://127.0.0.1:53297': {}}

And those tasks are already transitioned to the error state, yet not removed from _executing for some reason:

In [15]: client.run(lambda dask_worker: dask_worker.tasks)
Out[15]: 
{'tcp://127.0.0.1:53296': {"('raise_exc-1b1b1e12ba372e70b59471074272d8db', 10)": <Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 10)" error>,
  "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 11)": <Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 11)" error>,
  'lambda-8a4d94597fad66701d6bc1a322718b51': <Task 'lambda-8a4d94597fad66701d6bc1a322718b51' ready>},
 'tcp://127.0.0.1:53297': {"('raise_exc-1b1b1e12ba372e70b59471074272d8db', 24)": <Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 24)" error>,
  "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 25)": <Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 25)" error>}}

cc @fjetter @jrbourbeau

Environment:

  • Distributed version: 1441373
  • Python version: 3.9.1
  • Operating System: macOS
  • Install method (conda, pip, source): source

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions