Skip to content

test_blockwise_concatenate flaky #9330

@jrbourbeau

Description

@jrbourbeau

In #9329 we observed dask/tests/test_distributed.py::test_blockwise_concatenate fail with the following traceback:

__________________________ test_blockwise_concatenate __________________________
[gw1] darwin -- Python 3.9.13 /Users/runner/miniconda3/envs/test-environment/bin/python

c = <Client: 'tcp://127.0.0.1:51114' processes=2 threads=2, memory=28.00 GiB>

    def test_blockwise_concatenate(c):
        """Test a blockwise operation with concatenated axes"""
        da = pytest.importorskip("dask.array")
        np = pytest.importorskip("numpy")
    
        def f(x, y):
            da.assert_eq(y, [[0, 1, 2]])
            return x
    
        x = da.from_array(np.array([0, 1, 2]))
        y = da.from_array(np.array([[0, 1, 2]]))
        z = da.blockwise(
            f,
            ("i"),
            x,
            ("i"),
            y,
            ("ij"),
            dtype=x.dtype,
            concatenate=True,
        )
        c.compute(z, optimize_graph=False)
>       da.assert_eq(z, x, scheduler=c)

dask/tests/test_distributed.py:570: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/array/utils.py:304: in assert_eq
    a, adt, a_meta, a_computed = _get_dt_meta_computed(
dask/array/utils.py:263: in _get_dt_meta_computed
    x = _check_chunks(x, check_ndim=check_ndim, scheduler=scheduler)
dask/array/utils.py:230: in _check_chunks
    chunk = chunk.result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: error, key: ('f-53cf44bb6524322b43a85fc7c80aac6d', 0)>
timeout = None

    def result(self, timeout=None):
        """Wait until computation completes, gather result to local process.
    
        Parameters
        ----------
        timeout : number, optional
            Time in seconds after which to raise a
            ``dask.distributed.TimeoutError``
    
        Raises
        ------
        dask.distributed.TimeoutError
            If *timeout* seconds are elapsed before returning, a
            ``dask.distributed.TimeoutError`` is raised.
    
        Returns
        -------
        result
            The result of the computation. Or a coroutine if the client is asynchronous.
        """
        if self.client.asynchronous:
            return self.client.sync(self._result, callback_timeout=timeout)
    
        # shorten error traceback
        result = self.client.sync(self._result, callback_timeout=timeout, raiseit=False)
        if self.status == "error":
            typ, exc, tb = result
>           raise exc.with_traceback(tb)
E           AssertionError

../../../miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/client.py:280: AssertionError
---------------------------- Captured stderr setup -----------------------------
2022-07-28 19:16:27,139 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2022-07-28 19:16:27,186 - distributed.scheduler - INFO - State start
2022-07-28 19:16:27,217 - distributed.scheduler - INFO - Clear task state
2022-07-28 19:16:27,219 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:51114
2022-07-28 19:16:27,219 - distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
2022-07-28 19:16:27,336 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:51115
2022-07-28 19:16:27,336 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:51115
2022-07-28 19:16:27,337 - distributed.worker - INFO -          dashboard at:            127.0.0.1:51117
2022-07-28 19:16:27,337 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:51114
2022-07-28 19:16:27,337 - distributed.worker - INFO - -------------------------------------------------
2022-07-28 19:16:27,337 - distributed.worker - INFO -               Threads:                          1
2022-07-28 19:16:27,337 - distributed.worker - INFO -                Memory:                  14.00 GiB
2022-07-28 19:16:27,337 - distributed.worker - INFO -       Local Directory: /var/folders/24/8k48jl6d249_n_qfxwsl6xvm0000gn/T/dask-worker-space/worker-_jpqi8cv
2022-07-28 19:16:27,337 - distributed.worker - INFO - -------------------------------------------------
2022-07-28 19:16:27,337 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:51116
2022-07-28 19:16:27,337 - distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:51116
2022-07-28 19:16:27,338 - distributed.worker - INFO -          dashboard at:            127.0.0.1:51118
2022-07-28 19:16:27,338 - distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:51114
2022-07-28 19:16:27,338 - distributed.worker - INFO - -------------------------------------------------
2022-07-28 19:16:27,338 - distributed.worker - INFO -               Threads:                          1
2022-07-28 19:16:27,338 - distributed.worker - INFO -                Memory:                  14.00 GiB
2022-07-28 19:16:27,338 - distributed.worker - INFO -       Local Directory: /var/folders/24/8k48jl6d249_n_qfxwsl6xvm0000gn/T/dask-worker-space/worker-hzcbgv1o
2022-07-28 19:16:27,338 - distributed.worker - INFO - -------------------------------------------------
2022-07-28 19:16:28,538 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:51115', status: init, memory: 0, processing: 0>
2022-07-28 19:16:29,371 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:51115
2022-07-28 19:16:29,372 - distributed.core - INFO - Starting established connection
2022-07-28 19:16:29,372 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:51114
2022-07-28 19:16:29,372 - distributed.worker - INFO - -------------------------------------------------
2022-07-28 19:16:29,375 - distributed.core - INFO - Starting established connection
2022-07-28 19:16:29,376 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:51116', status: init, memory: 0, processing: 0>
2022-07-28 19:16:29,377 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:51116
2022-07-28 19:16:29,377 - distributed.core - INFO - Starting established connection
2022-07-28 19:16:29,377 - distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:51114
2022-07-28 19:16:29,378 - distributed.worker - INFO - -------------------------------------------------
2022-07-28 19:16:29,380 - distributed.core - INFO - Starting established connection
2022-07-28 19:16:29,415 - distributed.scheduler - INFO - Receive client connection: Client-c8790f76-0ea9-11ed-923a-005056982d4c
2022-07-28 19:16:29,415 - distributed.core - INFO - Starting established connection
----------------------------- Captured stderr call -----------------------------
2022-07-28 19:16:29,478 - distributed.worker - ERROR - Exception during execution of task ('array-3298b7fc0043ea6d15dd05835a48bdb3', 0, 0).
Traceback (most recent call last):
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/worker.py", line 2167, in execute
    assert ts.state == "executing"
AssertionError
--------------------------- Captured stderr teardown ---------------------------
2022-07-28 19:16:29,525 - distributed.scheduler - ERROR - Error transitioning "('array-3298b7fc0043ea6d15dd05835a48bdb3', 0, 0)" from 'erred' to 'memory'
Traceback (most recent call last):
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 1487, in _transition
    assert not args and not kwargs, (args, kwargs, start, finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:51116', 'nbytes': 24, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.', 'typename': 'numpy.ndarray', 'metadata': {}, 'thread': 123145661464576, 'startstops': ({'action': 'compute', 'start': 1659035790.5275288, 'stop': 1659035790.5275476},), 'status': 'OK'}, 'erred', 'memory')
2022-07-28 19:16:29,525 - distributed.core - ERROR - ((), {'worker': 'tcp://127.0.0.1:51116', 'nbytes': 24, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.', 'typename': 'numpy.ndarray', 'metadata': {}, 'thread': 123145661464576, 'startstops': ({'action': 'compute', 'start': 1659035790.5275288, 'stop': 1659035790.5275476},), 'status': 'OK'}, 'erred', 'memory')
Traceback (most recent call last):
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/core.py", line 841, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4709, in handle_task_finished
    r: tuple = self.stimulus_task_finished(
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4113, in stimulus_task_finished
    r: tuple = self._transition(
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 1487, in _transition
    assert not args and not kwargs, (args, kwargs, start, finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:51116', 'nbytes': 24, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.', 'typename': 'numpy.ndarray', 'metadata': {}, 'thread': 123145661464576, 'startstops': ({'action': 'compute', 'start': 1659035790.5275288, 'stop': 1659035790.5275476},), 'status': 'OK'}, 'erred', 'memory')
2022-07-28 19:16:29,526 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:51116', status: running, memory: 0, processing: 0>
2022-07-28 19:16:29,526 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-963fbf06-09c9-4de8-ab9c-e767b157a734 Address tcp://127.0.0.1:51116 Status: Status.running
2022-07-28 19:16:29,526 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:51116
2022-07-28 19:16:29,526 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:51116
2022-07-28 19:16:29,794 - distributed.core - ERROR - ((), {'worker': 'tcp://127.0.0.1:51116', 'nbytes': 24, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.', 'typename': 'numpy.ndarray', 'metadata': {}, 'thread': 123145661464576, 'startstops': ({'action': 'compute', 'start': 1659035790.5275288, 'stop': 1659035790.5275476},), 'status': 'OK'}, 'erred', 'memory')
Traceback (most recent call last):
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 3724, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4849, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/core.py", line 841, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4709, in handle_task_finished
    r: tuple = self.stimulus_task_finished(
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4113, in stimulus_task_finished
    r: tuple = self._transition(
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 1487, in _transition
    assert not args and not kwargs, (args, kwargs, start, finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:51116', 'nbytes': 24, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.', 'typename': 'numpy.ndarray', 'metadata': {}, 'thread': 123145661464576, 'startstops': ({'action': 'compute', 'start': 1659035790.5275288, 'stop': 1659035790.5275476},), 'status': 'OK'}, 'erred', 'memory')
2022-07-28 19:16:29,795 - distributed.core - ERROR - Exception while handling op register-worker
Traceback (most recent call last):
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/core.py", line 769, in _handle_comm
    result = await result
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 3724, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4849, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/core.py", line 841, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4709, in handle_task_finished
    r: tuple = self.stimulus_task_finished(
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4113, in stimulus_task_finished
    r: tuple = self._transition(
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 1487, in _transition
    assert not args and not kwargs, (args, kwargs, start, finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:51116', 'nbytes': 24, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.', 'typename': 'numpy.ndarray', 'metadata': {}, 'thread': 123145661464576, 'startstops': ({'action': 'compute', 'start': 1659035790.5275288, 'stop': 1659035790.5275476},), 'status': 'OK'}, 'erred', 'memory')
Task exception was never retrieved
future: <Task finished name='Task-11' coro=<Server._handle_comm() done, defined at /Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/core.py:675> exception=AssertionError(((), {'worker': 'tcp://127.0.0.1:51116', 'nbytes': 24, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.', 'typename': 'numpy.ndarray', 'metadata': {}, 'thread': 123145661464576, 'startstops': ({'action': 'compute', 'start': 1659035790.5275288, 'stop': 1659035790.5275476},), 'status': 'OK'}, 'erred', 'memory'))>
Traceback (most recent call last):
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/core.py", line 769, in _handle_comm
    result = await result
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/utils.py", line 778, in wrapper
    return await func(*args, **kwargs)
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 3724, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4849, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/core.py", line 841, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4709, in handle_task_finished
    r: tuple = self.stimulus_task_finished(
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 4113, in stimulus_task_finished
    r: tuple = self._transition(
  File "/Users/runner/miniconda3/envs/test-environment/lib/python3.9/site-packages/distributed/scheduler.py", line 1487, in _transition
    assert not args and not kwargs, (args, kwargs, start, finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:51116', 'nbytes': 24, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94.', 'typename': 'numpy.ndarray', 'metadata': {}, 'thread': 123145661464576, 'startstops': ({'action': 'compute', 'start': 1659035790.5275288, 'stop': 1659035790.5275476},), 'status': 'OK'}, 'erred', 'memory')
2022-07-28 19:16:29,809 - distributed.scheduler - INFO - Remove client Client-c8790f76-0ea9-11ed-923a-005056982d4c
2022-07-28 19:16:29,812 - distributed.scheduler - INFO - Remove client Client-c8790f76-0ea9-11ed-923a-005056982d4c
2022-07-28 19:16:29,815 - distributed.scheduler - INFO - Close client connection: Client-c8790f76-0ea9-11ed-923a-005056982d4c
2022-07-28 19:16:29,871 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:51115
2022-07-28 19:16:29,875 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-1eb6bf45-3ac5-4adf-84f7-5c410e0d521b Address tcp://127.0.0.1:51115 Status: Status.closing
2022-07-28 19:16:29,887 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:51115', status: closing, memory: 0, processing: 0>
2022-07-28 19:16:29,887 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:51115
2022-07-28 19:16:29,887 - distributed.scheduler - INFO - Lost all workers
2022-07-28 19:16:34,836 - distributed.scheduler - INFO - Scheduler closing...
2022-07-28 19:16:34,836 - distributed.scheduler - INFO - Scheduler closing all comms

I'm seeing assertion errors on the worker and scheduler that seem more related to Dask's internal code than the user code in the tests.

@crusaderky @gjoseph92 any idea what might be going on here?

Metadata

Metadata

Assignees

Labels

testsUnit tests and/or continuous integration

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions