-
-
Notifications
You must be signed in to change notification settings - Fork 756
Closed
Labels
bugSomething is brokenSomething is brokenp1Affects a large population and inhibits workAffects a large population and inhibits workregression
Description
What happened:
When a local cluster (using processes) shuts down, a ton of errors are now spewed about scheduling new futures after shutdown.
I can't replicate it in my distributed dev environment, but in a different environment (which is quite similar, also running dask & distributed from main—just py py3.9.1 instead of py3.9.5?) the process hangs and never terminates until a ctrl-C. In my distributed dev environment, the same errors are spewed, but it exits (with code 0 no the less).
git bisect implicates #6031 @graingert.
Minimal Complete Verifiable Example:
# repro.py
import distributed
from distributed.deploy.local import LocalCluster
if __name__ == "__main__":
cluster = LocalCluster(n_workers=1, threads_per_worker=1, processes=True)
client = distributed.Client(cluster)(dask-distributed) gabe dev/distributed ‹f4c52e9a› » python repro.py
2022-04-07 15:47:02,414 - distributed.utils - ERROR - cannot schedule new futures after shutdown
Traceback (most recent call last):
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 226, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/gabe/dev/distributed/distributed/client.py", line 1395, in _handle_report
msgs = await self.scheduler_comm.comm.read()
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 242, in read
convert_stream_closed_error(self, e)
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:58800 remote=tcp://127.0.0.1:58791>: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/gabe/dev/distributed/distributed/utils.py", line 693, in log_errors
yield
File "/Users/gabe/dev/distributed/distributed/client.py", line 1225, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/Users/gabe/dev/distributed/distributed/client.py", line 1254, in _ensure_connected
comm = await connect(
File "/Users/gabe/dev/distributed/distributed/comm/core.py", line 289, in connect
comm = await asyncio.wait_for(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
return fut.result()
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 439, in connect
stream = await self.client.connect(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 424, in resolve
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
return await self.run_in_executor(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
executor.submit(func, *args), loop=self)
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py", line 161, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
2022-04-07 15:47:02,417 - distributed.utils - ERROR - cannot schedule new futures after shutdown
Traceback (most recent call last):
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 226, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/gabe/dev/distributed/distributed/client.py", line 1395, in _handle_report
msgs = await self.scheduler_comm.comm.read()
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 242, in read
convert_stream_closed_error(self, e)
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:58800 remote=tcp://127.0.0.1:58791>: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/gabe/dev/distributed/distributed/utils.py", line 693, in log_errors
yield
File "/Users/gabe/dev/distributed/distributed/client.py", line 1401, in _handle_report
await self._reconnect()
File "/Users/gabe/dev/distributed/distributed/client.py", line 1225, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/Users/gabe/dev/distributed/distributed/client.py", line 1254, in _ensure_connected
comm = await connect(
File "/Users/gabe/dev/distributed/distributed/comm/core.py", line 289, in connect
comm = await asyncio.wait_for(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
return fut.result()
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 439, in connect
stream = await self.client.connect(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 424, in resolve
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
return await self.run_in_executor(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
executor.submit(func, *args), loop=self)
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py", line 161, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
2022-04-07 15:47:02,471 - distributed.utils - ERROR - cannot schedule new futures after shutdown
Traceback (most recent call last):
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 226, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/gabe/dev/distributed/distributed/client.py", line 1395, in _handle_report
msgs = await self.scheduler_comm.comm.read()
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 242, in read
convert_stream_closed_error(self, e)
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:58800 remote=tcp://127.0.0.1:58791>: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/gabe/dev/distributed/distributed/utils.py", line 693, in log_errors
yield
File "/Users/gabe/dev/distributed/distributed/client.py", line 1521, in _close
await asyncio.wait_for(asyncio.shield(handle_report_task), 0.1)
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
return fut.result()
File "/Users/gabe/dev/distributed/distributed/client.py", line 1401, in _handle_report
await self._reconnect()
File "/Users/gabe/dev/distributed/distributed/client.py", line 1225, in _reconnect
await self._ensure_connected(timeout=timeout)
File "/Users/gabe/dev/distributed/distributed/client.py", line 1254, in _ensure_connected
comm = await connect(
File "/Users/gabe/dev/distributed/distributed/comm/core.py", line 289, in connect
comm = await asyncio.wait_for(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
return fut.result()
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 439, in connect
stream = await self.client.connect(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
addrinfo = await self.resolver.resolve(host, port, af)
File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 424, in resolve
for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
return await self.run_in_executor(
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
executor.submit(func, *args), loop=self)
File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py", line 161, in submit
raise RuntimeError('cannot schedule new futures after shutdown')Environment:
- Dask version: 6e30766
- Python version: 3.9.5
- Operating System: macOS
- Install method (conda, pip, source): source
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething is brokenSomething is brokenp1Affects a large population and inhibits workAffects a large population and inhibits workregression