Skip to content

Error closing a local cluster when client still running #6087

@gjoseph92

Description

@gjoseph92

What happened:

When a local cluster (using processes) shuts down, a ton of errors are now spewed about scheduling new futures after shutdown.

I can't replicate it in my distributed dev environment, but in a different environment (which is quite similar, also running dask & distributed from main—just py py3.9.1 instead of py3.9.5?) the process hangs and never terminates until a ctrl-C. In my distributed dev environment, the same errors are spewed, but it exits (with code 0 no the less).

git bisect implicates #6031 @graingert.

Minimal Complete Verifiable Example:

# repro.py
import distributed
from distributed.deploy.local import LocalCluster


if __name__ == "__main__":
    cluster = LocalCluster(n_workers=1, threads_per_worker=1, processes=True)
    client = distributed.Client(cluster)
(dask-distributed) gabe dev/distributed ‹f4c52e9a› » python repro.py
2022-04-07 15:47:02,414 - distributed.utils - ERROR - cannot schedule new futures after shutdown
Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1395, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:58800 remote=tcp://127.0.0.1:58791>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 693, in log_errors
    yield
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1225, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1254, in _ensure_connected
    comm = await connect(
  File "/Users/gabe/dev/distributed/distributed/comm/core.py", line 289, in connect
    comm = await asyncio.wait_for(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 439, in connect
    stream = await self.client.connect(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 424, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py", line 161, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
2022-04-07 15:47:02,417 - distributed.utils - ERROR - cannot schedule new futures after shutdown
Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1395, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:58800 remote=tcp://127.0.0.1:58791>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 693, in log_errors
    yield
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1401, in _handle_report
    await self._reconnect()
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1225, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1254, in _ensure_connected
    comm = await connect(
  File "/Users/gabe/dev/distributed/distributed/comm/core.py", line 289, in connect
    comm = await asyncio.wait_for(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 439, in connect
    stream = await self.client.connect(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 424, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py", line 161, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
2022-04-07 15:47:02,471 - distributed.utils - ERROR - cannot schedule new futures after shutdown
Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1395, in _handle_report
    msgs = await self.scheduler_comm.comm.read()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) Client->Scheduler local=tcp://127.0.0.1:58800 remote=tcp://127.0.0.1:58791>: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 693, in log_errors
    yield
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1521, in _close
    await asyncio.wait_for(asyncio.shield(handle_report_task), 0.1)
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1401, in _handle_report
    await self._reconnect()
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1225, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/Users/gabe/dev/distributed/distributed/client.py", line 1254, in _ensure_connected
    comm = await connect(
  File "/Users/gabe/dev/distributed/distributed/comm/core.py", line 289, in connect
    comm = await asyncio.wait_for(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/tasks.py", line 481, in wait_for
    return fut.result()
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 439, in connect
    stream = await self.client.connect(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
    addrinfo = await self.resolver.resolve(host, port, af)
  File "/Users/gabe/dev/distributed/distributed/comm/tcp.py", line 424, in resolve
    for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 856, in getaddrinfo
    return await self.run_in_executor(
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/asyncio/base_events.py", line 814, in run_in_executor
    executor.submit(func, *args), loop=self)
  File "/Users/gabe/miniconda3/envs/dask-distributed/lib/python3.9/concurrent/futures/thread.py", line 161, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')

Environment:

  • Dask version: 6e30766
  • Python version: 3.9.5
  • Operating System: macOS
  • Install method (conda, pip, source): source

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething is brokenp1Affects a large population and inhibits workregression

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions