-
-
Notifications
You must be signed in to change notification settings - Fork 757
Closed
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.
Description
https://github.com/dask/distributed/runs/6606348457?check_suite_focus=true#step:11:1696
=================================== FAILURES ===================================
______________________________ test_group_timing _______________________________
self = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
deserializers = None
async def read(self, deserializers=None):
stream = self.stream
if stream is None:
raise CommClosedError()
fmt = "Q"
fmt_size = struct.calcsize(fmt)
try:
> frames_nbytes = await stream.read_bytes(fmt_size)
E tornado.iostream.StreamClosedError: Stream is closed
distributed/comm/tcp.py:226: StreamClosedError
The above exception was the direct cause of the following exception:
kwargs = {'close': True, 'timeout': 24.0}
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
async def send_recv_from_rpc(**kwargs):
if self.serializers is not None and kwargs.get("serializers") is None:
kwargs["serializers"] = self.serializers
if self.deserializers is not None and kwargs.get("deserializers") is None:
kwargs["deserializers"] = self.deserializers
comm = None
try:
comm = await self.live_comm()
comm.name = "rpc." + key
> result = await send_recv(comm=comm, op=key, **kwargs)
distributed/core.py:894:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
reply = True, serializers = None, deserializers = None
kwargs = {'close': True, 'op': 'restart', 'reply': True, 'timeout': 24.0}
msg = {'close': True, 'op': 'restart', 'reply': True, 'timeout': 24.0}
please_close = True, force_close = True
async def send_recv(
comm: Comm,
*,
reply: bool = True,
serializers=None,
deserializers=None,
**kwargs,
):
"""Send and recv with a Comm.
Keyword arguments turn into the message
response = await send_recv(comm, op='ping', reply=True)
"""
msg = kwargs
msg["reply"] = reply
please_close = kwargs.get("close", False)
force_close = False
if deserializers is None:
deserializers = serializers
if deserializers is not None:
msg["serializers"] = deserializers
try:
await comm.write(msg, serializers=serializers, on_error="raise")
if reply:
> response = await comm.read(deserializers=deserializers)
distributed/core.py:739:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
deserializers = None
async def read(self, deserializers=None):
stream = self.stream
if stream is None:
raise CommClosedError()
fmt = "Q"
fmt_size = struct.calcsize(fmt)
try:
frames_nbytes = await stream.read_bytes(fmt_size)
(frames_nbytes,) = struct.unpack(fmt, frames_nbytes)
frames = host_array(frames_nbytes)
for i, j in sliding_window(
2,
range(0, frames_nbytes + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE),
):
chunk = frames[i:j]
chunk_nbytes = len(chunk)
n = await stream.read_into(chunk)
assert n == chunk_nbytes, (n, chunk_nbytes)
except StreamClosedError as e:
self.stream = None
self._closed = True
if not sys.is_finalizing():
> convert_stream_closed_error(self, e)
distributed/comm/tcp.py:242:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
obj = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
exc = StreamClosedError('Stream is closed')
def convert_stream_closed_error(obj, exc):
"""
Re-raise StreamClosedError as CommClosedError.
"""
if exc.real_error is not None:
# The stream was closed because of an underlying OS error
exc = exc.real_error
if ssl and isinstance(exc, ssl.SSLError):
if "UNKNOWN_CA" in exc.reason:
raise FatalCommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}")
raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
else:
> raise CommClosedError(f"in {obj}: {exc}") from exc
E distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>: Stream is closed
distributed/comm/tcp.py:150: CommClosedError
The above exception was the direct cause of the following exception:
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:39933', workers: 0, cores: 0, tasks: 0>
a = <Nanny: None, threads: 1>, b = <Nanny: None, threads: 2>
@gen_cluster(client=True, Worker=Nanny)
async def test_group_timing(c, s, a, b):
p = GroupTiming(s)
s.add_plugin(p)
assert len(p.time) == 2
assert len(p.nthreads) == 2
futures1 = c.map(slowinc, range(10), delay=0.3)
futures2 = c.map(slowdec, range(10), delay=0.3)
await wait(futures1 + futures2)
assert len(p.time) > 2
assert len(p.nthreads) == len(p.time)
assert all([nt == s.total_nthreads for nt in p.nthreads])
assert "slowinc" in p.compute
assert "slowdec" in p.compute
assert all([len(v) == len(p.time) for v in p.compute.values()])
assert s.task_groups.keys() == p.compute.keys()
assert all(
[
abs(s.task_groups[k].all_durations["compute"] - sum(v)) < 1.0e-12
for k, v in p.compute.items()
]
)
> await s.restart()
distributed/diagnostics/tests/test_progress.py:210:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/utils.py:761: in wrapper
return await func(*args, **kwargs)
distributed/scheduler.py:5098: in restart
resps = await asyncio.wait_for(resps, timeout)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:494: in wait_for
return fut.result()
distributed/utils.py:218: in All
result = await tasks.next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
kwargs = {'close': True, 'timeout': 24.0}
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
async def send_recv_from_rpc(**kwargs):
if self.serializers is not None and kwargs.get("serializers") is None:
kwargs["serializers"] = self.serializers
if self.deserializers is not None and kwargs.get("deserializers") is None:
kwargs["deserializers"] = self.deserializers
comm = None
try:
comm = await self.live_comm()
comm.name = "rpc." + key
result = await send_recv(comm=comm, op=key, **kwargs)
except (RPCClosed, CommClosedError) as e:
if comm:
> raise type(e)(
f"Exception while trying to call remote method {key!r} before comm was established."
) from e
E distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
distributed/core.py:897: CommClosedError
----------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump/test_group_timing.yaml
----------------------------- Captured stderr call -----------------------------
2022-05-26 08:26:59,172 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:44773
2022-05-26 08:26:59,172 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:44773
2022-05-26 08:26:59,172 - distributed.worker - INFO - dashboard at: 127.0.0.1:37305
2022-05-26 08:26:59,173 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:39933
2022-05-26 08:26:59,173 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,173 - distributed.worker - INFO - Threads: 1
2022-05-26 08:26:59,173 - distributed.worker - INFO - Memory: 6.78 GiB
2022-05-26 08:26:59,173 - distributed.worker - INFO - Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-vr4ic7w5
2022-05-26 08:26:59,173 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,187 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:33261
2022-05-26 08:26:59,187 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:33261
2022-05-26 08:26:59,188 - distributed.worker - INFO - dashboard at: 127.0.0.1:37299
2022-05-26 08:26:59,188 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:39933
2022-05-26 08:26:59,188 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,188 - distributed.worker - INFO - Threads: 2
2022-05-26 08:26:59,188 - distributed.worker - INFO - Memory: 6.78 GiB
2022-05-26 08:26:59,188 - distributed.worker - INFO - Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-xwekb6zo
2022-05-26 08:26:59,188 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,629 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:39933
2022-05-26 08:26:59,629 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,630 - distributed.core - INFO - Starting established connection
2022-05-26 08:26:59,656 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:39933
2022-05-26 08:26:59,656 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,656 - distributed.core - INFO - Starting established connection
2022-05-26 08:27:02,135 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:44773'.
2022-05-26 08:27:02,136 - distributed.worker - ERROR - Scheduler was unaware of this worker 'tcp://127.0.0.1:44773'. Shutting down.
2022-05-26 08:27:02,136 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:44773
2022-05-26 08:27:02,144 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. Status: Status.closing
2022-05-26 08:27:02,146 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33261
2022-05-26 08:27:02,146 - distributed.nanny - INFO - Worker closed
2022-05-26 08:27:02,147 - distributed.nanny - ERROR - Worker process died unexpectedly
2022-05-26 08:27:02,147 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. Status: Status.closing
Traceback (most recent call last):
File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/queues.py", line 245, in _feed
send_bytes(obj)
File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
2022-05-26 08:27:02,335 - distributed.nanny - WARNING - Restarting worker
2022-05-26 08:27:03,766 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:37491
2022-05-26 08:27:03,767 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:37491
2022-05-26 08:27:03,767 - distributed.worker - INFO - dashboard at: 127.0.0.1:39653
2022-05-26 08:27:03,767 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:39933
2022-05-26 08:27:03,767 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:03,767 - distributed.worker - INFO - Threads: 2
2022-05-26 08:27:03,767 - distributed.worker - INFO - Memory: 6.78 GiB
2022-05-26 08:27:03,767 - distributed.worker - INFO - Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-ozdrdinu
2022-05-26 08:27:03,767 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:03,769 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:37491
2022-05-26 08:27:03,769 - distributed.worker - INFO - Closed worker has not yet started: Status.init
2022-05-26 08:27:03,825 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:32867
2022-05-26 08:27:03,825 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:32867
2022-05-26 08:27:03,825 - distributed.worker - INFO - dashboard at: 127.0.0.1:42421
2022-05-26 08:27:03,825 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:39933
2022-05-26 08:27:03,825 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:03,825 - distributed.worker - INFO - Threads: 1
2022-05-26 08:27:03,825 - distributed.worker - INFO - Memory: 6.78 GiB
2022-05-26 08:27:03,825 - distributed.worker - INFO - Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-uo3d6iix
2022-05-26 08:27:03,826 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:04,277 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:39933
2022-05-26 08:27:04,277 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:04,278 - distributed.core - INFO - Starting established connection
2022-05-26 08:27:04,943 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:32867
2022-05-26 08:27:04,944 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. Status: Status.closing
------------------------------ Captured log call -------------------------------
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.11894.830519' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.12771.494043' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13829.553757' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13184.014458' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.10821.172500' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.11321.529407' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.10872.792584' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.11320.329941' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.15696.336646' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13303.165900' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13126.769215' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
data._warn(str(exc))
ERROR asyncio:base_events.py:1707 _GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
ERROR asyncio:base_events.py:1707 Future exception was never retrieved
future: <Future finished exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
Traceback (most recent call last):
File "/home/runner/work/distributed/distributed/distributed/comm/tcp.py", line 226, in read
frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/runner/work/distributed/distributed/distributed/core.py", line 894, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "/home/runner/work/distributed/distributed/distributed/core.py", line 739, in send_recv
response = await comm.read(deserializers=deserializers)
File "/home/runner/work/distributed/distributed/distributed/comm/tcp.py", line 242, in read
convert_stream_closed_error(self, e)
File "/home/runner/work/distributed/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:42246 remote=tcp://127.0.0.1:40241>: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/gen.py", line 769, in run
yielded = self.gen.throw(*exc_info) # type: ignore
File "/home/runner/work/distributed/distributed/distributed/utils.py", line 231, in quiet
yield task
File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/runner/work/distributed/distributed/distributed/core.py", line 897, in send_recv_from_rpc
raise type(e)(
distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
ERROR asyncio:base_events.py:1707 _GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
- generated xml file: /home/runner/work/distributed/distributed/reports/pytest.xml -
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
flaky testIntermittent failures on CI.Intermittent failures on CI.