-
-
Notifications
You must be signed in to change notification settings - Fork 757
Description
Minimal Complete Verifiable Example:
from dask.datasets import timeseries
from dask.distributed import Client, LocalCluster
def raise_exc(part):
raise RuntimeError
if __name__ == '__main__':
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
with Client(cluster) as client:
ts = timeseries()
ts = ts.map_partitions(raise_exc, meta={})
print("starting first compute on cluster")
try:
ts.compute()
except:
pass
print("first compute done")
print("starting second compute on cluster")
X = client.submit(lambda x: x+1, 1).result()
print("second compute done")
print(X)second compute done never prints on distributed main, 2021.10.0. It succeeds on 2021.09.1. (Note this was all with dask/dask on main dask/dask@c2278fe).
Originally reported in lightgbm-org/LightGBM#4771 (comment) (thank you @jmoralez and @jameslamb!). I've slightly simplified the example (the multiple clients weren't necessary to reproduce, nor recomputing timeseries—it seems that any subsequent computation deadlocks). I haven't been able to simplify timeseries().map_partitions(raise_exc); just submitting Futures I can't replicate it.
Anything else we need to know?:
I poked briefly at the deadlocked cluster from another client. This is not #5480; the workers' BatchedSends have nothing in their buffers. Additionally, the scheduler and worker both actually agree that the workers have more tasks remaining to do (client.processing() lines up with client.run(lambda dask_worker: dask_worker.ready)).
This seems to be purely a Worker state machine deadlock. The problem is that there are keys left in Worker._executing:
In [14]: client.run(lambda dask_worker: dask_worker._executing)
Out[14]:
{'tcp://127.0.0.1:53296': {<Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 10)" error>,
<Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 11)" error>},
'tcp://127.0.0.1:53297': {<Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 24)" error>,
<Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 25)" error>}}But threads aren't actually running:
In [13]: client.run(lambda dask_worker: dask_worker.active_threads)
Out[13]: {'tcp://127.0.0.1:53296': {}, 'tcp://127.0.0.1:53297': {}}And those tasks are already transitioned to the error state, yet not removed from _executing for some reason:
In [15]: client.run(lambda dask_worker: dask_worker.tasks)
Out[15]:
{'tcp://127.0.0.1:53296': {"('raise_exc-1b1b1e12ba372e70b59471074272d8db', 10)": <Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 10)" error>,
"('raise_exc-1b1b1e12ba372e70b59471074272d8db', 11)": <Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 11)" error>,
'lambda-8a4d94597fad66701d6bc1a322718b51': <Task 'lambda-8a4d94597fad66701d6bc1a322718b51' ready>},
'tcp://127.0.0.1:53297': {"('raise_exc-1b1b1e12ba372e70b59471074272d8db', 24)": <Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 24)" error>,
"('raise_exc-1b1b1e12ba372e70b59471074272d8db', 25)": <Task "('raise_exc-1b1b1e12ba372e70b59471074272d8db', 25)" error>}}Environment:
- Distributed version: 1441373
- Python version: 3.9.1
- Operating System: macOS
- Install method (conda, pip, source): source