Skip to content

flaky test diagnostics/tests/test_progress.py::test_group_timing #6452

@graingert

Description

@graingert

https://github.com/dask/distributed/runs/6606348457?check_suite_focus=true#step:11:1696

=================================== FAILURES ===================================
______________________________ test_group_timing _______________________________

self = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
deserializers = None

    async def read(self, deserializers=None):
        stream = self.stream
        if stream is None:
            raise CommClosedError()
    
        fmt = "Q"
        fmt_size = struct.calcsize(fmt)
    
        try:
>           frames_nbytes = await stream.read_bytes(fmt_size)
E           tornado.iostream.StreamClosedError: Stream is closed

distributed/comm/tcp.py:226: StreamClosedError

The above exception was the direct cause of the following exception:

kwargs = {'close': True, 'timeout': 24.0}
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>

    async def send_recv_from_rpc(**kwargs):
        if self.serializers is not None and kwargs.get("serializers") is None:
            kwargs["serializers"] = self.serializers
        if self.deserializers is not None and kwargs.get("deserializers") is None:
            kwargs["deserializers"] = self.deserializers
        comm = None
        try:
            comm = await self.live_comm()
            comm.name = "rpc." + key
>           result = await send_recv(comm=comm, op=key, **kwargs)

distributed/core.py:894: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
reply = True, serializers = None, deserializers = None
kwargs = {'close': True, 'op': 'restart', 'reply': True, 'timeout': 24.0}
msg = {'close': True, 'op': 'restart', 'reply': True, 'timeout': 24.0}
please_close = True, force_close = True

    async def send_recv(
        comm: Comm,
        *,
        reply: bool = True,
        serializers=None,
        deserializers=None,
        **kwargs,
    ):
        """Send and recv with a Comm.
    
        Keyword arguments turn into the message
    
        response = await send_recv(comm, op='ping', reply=True)
        """
        msg = kwargs
        msg["reply"] = reply
        please_close = kwargs.get("close", False)
        force_close = False
        if deserializers is None:
            deserializers = serializers
        if deserializers is not None:
            msg["serializers"] = deserializers
    
        try:
            await comm.write(msg, serializers=serializers, on_error="raise")
            if reply:
>               response = await comm.read(deserializers=deserializers)

distributed/core.py:739: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
deserializers = None

    async def read(self, deserializers=None):
        stream = self.stream
        if stream is None:
            raise CommClosedError()
    
        fmt = "Q"
        fmt_size = struct.calcsize(fmt)
    
        try:
            frames_nbytes = await stream.read_bytes(fmt_size)
            (frames_nbytes,) = struct.unpack(fmt, frames_nbytes)
    
            frames = host_array(frames_nbytes)
            for i, j in sliding_window(
                2,
                range(0, frames_nbytes + OPENSSL_MAX_CHUNKSIZE, OPENSSL_MAX_CHUNKSIZE),
            ):
                chunk = frames[i:j]
                chunk_nbytes = len(chunk)
                n = await stream.read_into(chunk)
                assert n == chunk_nbytes, (n, chunk_nbytes)
        except StreamClosedError as e:
            self.stream = None
            self._closed = True
            if not sys.is_finalizing():
>               convert_stream_closed_error(self, e)

distributed/comm/tcp.py:242: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

obj = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>
exc = StreamClosedError('Stream is closed')

    def convert_stream_closed_error(obj, exc):
        """
        Re-raise StreamClosedError as CommClosedError.
        """
        if exc.real_error is not None:
            # The stream was closed because of an underlying OS error
            exc = exc.real_error
            if ssl and isinstance(exc, ssl.SSLError):
                if "UNKNOWN_CA" in exc.reason:
                    raise FatalCommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}")
            raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
        else:
>           raise CommClosedError(f"in {obj}: {exc}") from exc
E           distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>: Stream is closed

distributed/comm/tcp.py:150: CommClosedError

The above exception was the direct cause of the following exception:

c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:39933', workers: 0, cores: 0, tasks: 0>
a = <Nanny: None, threads: 1>, b = <Nanny: None, threads: 2>

    @gen_cluster(client=True, Worker=Nanny)
    async def test_group_timing(c, s, a, b):
        p = GroupTiming(s)
        s.add_plugin(p)
    
        assert len(p.time) == 2
        assert len(p.nthreads) == 2
    
        futures1 = c.map(slowinc, range(10), delay=0.3)
        futures2 = c.map(slowdec, range(10), delay=0.3)
        await wait(futures1 + futures2)
    
        assert len(p.time) > 2
        assert len(p.nthreads) == len(p.time)
        assert all([nt == s.total_nthreads for nt in p.nthreads])
        assert "slowinc" in p.compute
        assert "slowdec" in p.compute
        assert all([len(v) == len(p.time) for v in p.compute.values()])
        assert s.task_groups.keys() == p.compute.keys()
        assert all(
            [
                abs(s.task_groups[k].all_durations["compute"] - sum(v)) < 1.0e-12
                for k, v in p.compute.items()
            ]
        )
    
>       await s.restart()

distributed/diagnostics/tests/test_progress.py:210: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/utils.py:761: in wrapper
    return await func(*args, **kwargs)
distributed/scheduler.py:5098: in restart
    resps = await asyncio.wait_for(resps, timeout)
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/asyncio/tasks.py:494: in wait_for
    return fut.result()
distributed/utils.py:218: in All
    result = await tasks.next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

kwargs = {'close': True, 'timeout': 24.0}
comm = <TCP (closed) rpc.restart local=tcp://127.0.0.1:49094 remote=tcp://127.0.0.1:42945>

    async def send_recv_from_rpc(**kwargs):
        if self.serializers is not None and kwargs.get("serializers") is None:
            kwargs["serializers"] = self.serializers
        if self.deserializers is not None and kwargs.get("deserializers") is None:
            kwargs["deserializers"] = self.deserializers
        comm = None
        try:
            comm = await self.live_comm()
            comm.name = "rpc." + key
            result = await send_recv(comm=comm, op=key, **kwargs)
        except (RPCClosed, CommClosedError) as e:
            if comm:
>               raise type(e)(
                    f"Exception while trying to call remote method {key!r} before comm was established."
                ) from e
E               distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.

distributed/core.py:897: CommClosedError
----------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump/test_group_timing.yaml
----------------------------- Captured stderr call -----------------------------
2022-05-26 08:26:59,172 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:44773
2022-05-26 08:26:59,172 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:44773
2022-05-26 08:26:59,172 - distributed.worker - INFO -          dashboard at:            127.0.0.1:37305
2022-05-26 08:26:59,173 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:39933
2022-05-26 08:26:59,173 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,173 - distributed.worker - INFO -               Threads:                          1
2022-05-26 08:26:59,173 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-05-26 08:26:59,173 - distributed.worker - INFO -       Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-vr4ic7w5
2022-05-26 08:26:59,173 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,187 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:33261
2022-05-26 08:26:59,187 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:33261
2022-05-26 08:26:59,188 - distributed.worker - INFO -          dashboard at:            127.0.0.1:37299
2022-05-26 08:26:59,188 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:39933
2022-05-26 08:26:59,188 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,188 - distributed.worker - INFO -               Threads:                          2
2022-05-26 08:26:59,188 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-05-26 08:26:59,188 - distributed.worker - INFO -       Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-xwekb6zo
2022-05-26 08:26:59,188 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,629 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:39933
2022-05-26 08:26:59,629 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,630 - distributed.core - INFO - Starting established connection
2022-05-26 08:26:59,656 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:39933
2022-05-26 08:26:59,656 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:26:59,656 - distributed.core - INFO - Starting established connection
2022-05-26 08:27:02,135 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:44773'.
2022-05-26 08:27:02,136 - distributed.worker - ERROR - Scheduler was unaware of this worker 'tcp://127.0.0.1:44773'. Shutting down.
2022-05-26 08:27:02,136 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:44773
2022-05-26 08:27:02,144 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting.  Status: Status.closing
2022-05-26 08:27:02,146 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33261
2022-05-26 08:27:02,146 - distributed.nanny - INFO - Worker closed
2022-05-26 08:27:02,147 - distributed.nanny - ERROR - Worker process died unexpectedly
2022-05-26 08:27:02,147 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting.  Status: Status.closing
Traceback (most recent call last):
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/queues.py", line 245, in _feed
    send_bytes(obj)
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
2022-05-26 08:27:02,335 - distributed.nanny - WARNING - Restarting worker
2022-05-26 08:27:03,766 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:37491
2022-05-26 08:27:03,767 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:37491
2022-05-26 08:27:03,767 - distributed.worker - INFO -          dashboard at:            127.0.0.1:39653
2022-05-26 08:27:03,767 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:39933
2022-05-26 08:27:03,767 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:03,767 - distributed.worker - INFO -               Threads:                          2
2022-05-26 08:27:03,767 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-05-26 08:27:03,767 - distributed.worker - INFO -       Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-ozdrdinu
2022-05-26 08:27:03,767 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:03,769 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:37491
2022-05-26 08:27:03,769 - distributed.worker - INFO - Closed worker has not yet started: Status.init
2022-05-26 08:27:03,825 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:32867
2022-05-26 08:27:03,825 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:32867
2022-05-26 08:27:03,825 - distributed.worker - INFO -          dashboard at:            127.0.0.1:42421
2022-05-26 08:27:03,825 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:39933
2022-05-26 08:27:03,825 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:03,825 - distributed.worker - INFO -               Threads:                          1
2022-05-26 08:27:03,825 - distributed.worker - INFO -                Memory:                   6.78 GiB
2022-05-26 08:27:03,825 - distributed.worker - INFO -       Local Directory: /tmp/tmpahot4b5z/dask-worker-space/worker-uo3d6iix
2022-05-26 08:27:03,826 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:04,277 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:39933
2022-05-26 08:27:04,277 - distributed.worker - INFO - -------------------------------------------------
2022-05-26 08:27:04,278 - distributed.core - INFO - Starting established connection
2022-05-26 08:27:04,943 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:32867
2022-05-26 08:27:04,944 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting.  Status: Status.closing
------------------------------ Captured log call -------------------------------
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.11894.830519' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.12771.494043' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13829.553757' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13184.014458' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.10821.172500' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.11321.529407' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.10872.792584' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.11320.329941' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.15696.336646' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13303.165900' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/coverage/data.py:129: CoverageWarning: Data file '/home/runner/work/distributed/distributed/.coverage.fv-az465-78.13126.769215' doesn't seem to be a coverage data file: cannot unpack non-iterable NoneType object
  data._warn(str(exc))
ERROR    asyncio:base_events.py:1707 _GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
ERROR    asyncio:base_events.py:1707 Future exception was never retrieved
future: <Future finished exception=CommClosedError("Exception while trying to call remote method 'restart' before comm was established.")>
Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/comm/tcp.py", line 226, in read
    frames_nbytes = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 894, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 739, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/home/runner/work/distributed/distributed/distributed/comm/tcp.py", line 242, in read
    convert_stream_closed_error(self, e)
  File "/home/runner/work/distributed/distributed/distributed/comm/tcp.py", line 150, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) rpc.restart local=tcp://127.0.0.1:42246 remote=tcp://127.0.0.1:40241>: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/gen.py", line 769, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/home/runner/work/distributed/distributed/distributed/utils.py", line 231, in quiet
    yield task
  File "/usr/share/miniconda3/envs/dask-distributed/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/runner/work/distributed/distributed/distributed/core.py", line 897, in send_recv_from_rpc
    raise type(e)(
distributed.comm.core.CommClosedError: Exception while trying to call remote method 'restart' before comm was established.
ERROR    asyncio:base_events.py:1707 _GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
- generated xml file: /home/runner/work/distributed/distributed/reports/pytest.xml -

Metadata

Metadata

Assignees

Labels

flaky testIntermittent failures on CI.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions