-
-
Notifications
You must be signed in to change notification settings - Fork 757
Description
test_worker_who_has_clears_after_failed_connection failed in #6822, but I don't think it's related to the changes there. (Or if it is, the changes shouldn't have broken it.) I think it's indicating an actual state machine bug.
The test seems to be testing:
- Worker N has some data
- Worker A fetches all that data
- While the fetch is happening, worker N dies
- This does not cause a deadlock. Furthermore, worker A no longer thanks that worker N has any data
But the way the test is written, it's possible that the fetch (or some of it) manages to complete before worker N dies. It's just a race condition between an os._exit and the 0.1s delay on a SlowTransmitData.
When I looked at the cluster dump for this test, I noticed
- Work-stealing is kicking in, and actually the tasks intended for worker N are running on all workers (loose restricted tasks are allowed to be stolen)
- Worker A is successfully receiving data from worker N, even after the
os._exitis kicked off
If some of the data is successfully fetched, then _handle_gather_dep_success won't be removing the keys from self.has_what and ts.who_has:
distributed/distributed/worker_state_machine.py
Lines 2770 to 2785 in 4af2d0a
| def _handle_gather_dep_success(self, ev: GatherDepSuccessEvent) -> RecsInstrs: | |
| """gather_dep terminated successfully. | |
| The response may contain less keys than the request. | |
| """ | |
| recommendations: Recs = {} | |
| for ts in self._gather_dep_done_common(ev): | |
| if ts.key in ev.data: | |
| recommendations[ts] = ("memory", ev.data[ts.key]) | |
| else: | |
| self.log.append((ts.key, "missing-dep", ev.stimulus_id, time())) | |
| if self.validate: | |
| assert ts.state != "fetch" | |
| assert ts not in self.data_needed[ev.worker] | |
| ts.who_has.discard(ev.worker) | |
| self.has_what[ev.worker].discard(ts.key) | |
| recommendations[ts] = "fetch" |
Notice that self.has_what[ev.worker].discard(ts.key) only happens in the else branch, when the key wasn't successfully received.
@crusaderky @fjetter what's the intent of this test, and how should we update it?
- Update test to guarantee the worker gets killed before it can transfer the data (use
BlockedGetDatainstead of delay-basedSlowTransmitData). Also disable stealing. - Have the state machine remove tasks from
self.has_what[ev.worker](andts.who_has?) regardless of whether they're successfully fetched. I don't think anything is doing this, and they'll just sit around inhas_whatforever right now? EDIT:_purge_statedoes, so this seems okay as-is.
______________ test_worker_who_has_clears_after_failed_connection ______________
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35425', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:39467', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:36259', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@pytest.mark.slow
@gen_cluster(client=True)
async def test_worker_who_has_clears_after_failed_connection(c, s, a, b):
"""This test is very sensitive to cluster state consistency. Timeouts often
indicate subtle deadlocks. Be mindful when marking flaky/repeat/etc."""
async with Nanny(s.address, nthreads=2) as n:
while len(s.workers) < 3:
await asyncio.sleep(0.01)
def slow_ser(x, delay):
return SlowTransmitData(x, delay=delay)
n_worker_address = n.worker_address
futures = c.map(
slow_ser,
range(20),
delay=0.1,
key=["f%d" % i for i in range(20)],
workers=[n_worker_address],
allow_other_workers=True,
)
def sink(*args):
pass
await wait(futures)
result_fut = c.submit(sink, futures, workers=a.address)
with suppress(CommClosedError):
await c.run(os._exit, 1, workers=[n_worker_address])
while len(s.workers) > 2:
await asyncio.sleep(0.01)
await result_fut
> assert not a.state.has_what.get(n_worker_address)
E AssertionError: assert not {'f0', 'f1', 'f10', 'f12', 'f17', 'f3', ...}
E + where {'f0', 'f1', 'f10', 'f12', 'f17', 'f3', ...} = <built-in method get of collections.defaultdict object at 0x7fa76f59a540>('tcp://127.0.0.1:46817')
E + where <built-in method get of collections.defaultdict object at 0x7fa76f59a540> = defaultdict(<class 'set'>, {'tcp://127.0.0.1:36259': {'f2', 'f19', 'f14', 'f16', 'f15'}, 'tcp://127.0.0.1:46817': {'f9', 'f10', 'f1', 'f3', 'f12', 'f0', 'f17', 'f8'}}).get
E + where defaultdict(<class 'set'>, {'tcp://127.0.0.1:36259': {'f2', 'f19', 'f14', 'f16', 'f15'}, 'tcp://127.0.0.1:46817': {'f9', 'f10', 'f1', 'f3', 'f12', 'f0', 'f17', 'f8'}}) = <distributed.worker_state_machine.WorkerState object at 0x7fa76f6ba040>.has_what
E + where <distributed.worker_state_machine.WorkerState object at 0x7fa76f6ba040> = <Worker 'tcp://127.0.0.1:39467', name: 0, status: running, stored: 21, running: 0/1, ready: 0, comm: 0, waiting: 0>.state
distributed/tests/test_failed_workers.py:363: AssertionError
----------------------------- Captured stdout call -----------------------------
Dumped cluster state to test_cluster_dump/test_worker_who_has_clears_after_failed_connection.yaml
----------------------------- Captured stderr call -----------------------------
2022-08-04 07:17:12,858 - distributed.scheduler - INFO - State start
2022-08-04 07:17:12,860 - distributed.scheduler - INFO - Clear task state
2022-08-04 07:17:12,861 - distributed.scheduler - INFO - Scheduler at: tcp://127.0.0.1:35425
2022-08-04 07:17:12,861 - distributed.scheduler - INFO - dashboard at: 127.0.0.1:35609
2022-08-04 07:17:12,867 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:39467
2022-08-04 07:17:12,867 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:39467
2022-08-04 07:17:12,867 - distributed.worker - INFO - dashboard at: 127.0.0.1:35707
2022-08-04 07:17:12,867 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:35425
2022-08-04 07:17:12,867 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,867 - distributed.worker - INFO - Threads: 1
2022-08-04 07:17:12,867 - distributed.worker - INFO - Memory: 6.78 GiB
2022-08-04 07:17:12,867 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-cx9zhjsd
2022-08-04 07:17:12,867 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,868 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:36259
2022-08-04 07:17:12,868 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:36259
2022-08-04 07:17:12,868 - distributed.worker - INFO - dashboard at: 127.0.0.1:46347
2022-08-04 07:17:12,868 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:35425
2022-08-04 07:17:12,868 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,869 - distributed.worker - INFO - Threads: 2
2022-08-04 07:17:12,869 - distributed.worker - INFO - Memory: 6.78 GiB
2022-08-04 07:17:12,869 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-eaft19vh
2022-08-04 07:17:12,869 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,874 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:39467', name: 0, status: init, memory: 0, processing: 0>
2022-08-04 07:17:12,875 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:39467
2022-08-04 07:17:12,875 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:12,876 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:36259', name: 1, status: init, memory: 0, processing: 0>
2022-08-04 07:17:12,876 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:36259
2022-08-04 07:17:12,876 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:12,877 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:35425
2022-08-04 07:17:12,877 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,877 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:35425
2022-08-04 07:17:12,877 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:12,878 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:12,878 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:12,895 - distributed.scheduler - INFO - Receive client connection: Client-76148fba-13c5-11ed-8a5f-000d3a9fb637
2022-08-04 07:17:12,895 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:12,901 - distributed.nanny - INFO - Start Nanny at: 'tcp://127.0.0.1:39065'
2022-08-04 07:17:13,828 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:46817
2022-08-04 07:17:13,828 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:46817
2022-08-04 07:17:13,828 - distributed.worker - INFO - dashboard at: 127.0.0.1:46681
2022-08-04 07:17:13,828 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:35425
2022-08-04 07:17:13,828 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:13,828 - distributed.worker - INFO - Threads: 2
2022-08-04 07:17:13,828 - distributed.worker - INFO - Memory: 6.78 GiB
2022-08-04 07:17:13,828 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-18qsypko
2022-08-04 07:17:13,828 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:14,158 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:46817', status: init, memory: 0, processing: 0>
2022-08-04 07:17:14,160 - distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:46817
2022-08-04 07:17:14,160 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:14,160 - distributed.worker - INFO - Registered to: tcp://127.0.0.1:35425
2022-08-04 07:17:14,160 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:14,162 - distributed.core - INFO - Starting established connection
2022-08-04 07:17:15,294 - distributed.worker - INFO - Run out-of-band function '_exit'
2022-08-04 07:17:15,300 - distributed.scheduler - ERROR - broadcast to tcp://127.0.0.1:46817 failed: CommClosedError: in <TCP (closed) Scheduler Broadcast local=tcp://127.0.0.1:50650 remote=tcp://127.0.0.1:46817>: Stream is closed
2022-08-04 07:17:15,301 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:46817', status: running, memory: 8, processing: 0>
2022-08-04 07:17:15,302 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:46817
2022-08-04 07:17:15,302 - distributed.nanny - INFO - Worker process 8037 exited with status 1
2022-08-04 07:17:15,304 - distributed.nanny - WARNING - Restarting worker
2022-08-04 07:17:15,327 - distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:39065'.
2022-08-04 07:17:15,327 - distributed.nanny - INFO - Nanny asking worker to close
2022-08-04 07:17:16,262 - distributed.worker - INFO - Start worker at: tcp://127.0.0.1:33633
2022-08-04 07:17:16,262 - distributed.worker - INFO - Listening to: tcp://127.0.0.1:33633
2022-08-04 07:17:16,262 - distributed.worker - INFO - dashboard at: 127.0.0.1:45951
2022-08-04 07:17:16,262 - distributed.worker - INFO - Waiting to connect to: tcp://127.0.0.1:35425
2022-08-04 07:17:16,262 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:16,262 - distributed.worker - INFO - Threads: 2
2022-08-04 07:17:16,262 - distributed.worker - INFO - Memory: 6.78 GiB
2022-08-04 07:17:16,262 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-f9t5upne
2022-08-04 07:17:16,263 - distributed.worker - INFO - -------------------------------------------------
2022-08-04 07:17:16,263 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:33633
2022-08-04 07:17:16,263 - distributed.worker - INFO - Closed worker has not yet started: Status.init
2022-08-04 07:17:16,344 - distributed.comm.tcp - INFO - Connection from tcp://127.0.0.1:56300 closed before handshake completed
2022-08-04 07:17:17,072 - distributed.scheduler - INFO - Remove client Client-76148fba-13c5-11ed-8a5f-000d3a9fb637
2022-08-04 07:17:17,074 - distributed.scheduler - INFO - Remove client Client-76148fba-13c5-11ed-8a5f-000d3a9fb637
2022-08-04 07:17:17,075 - distributed.scheduler - INFO - Close client connection: Client-76148fba-13c5-11ed-8a5f-000d3a9fb637
2022-08-04 07:17:17,083 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:39467
2022-08-04 07:17:17,083 - distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:36259
2022-08-04 07:17:17,085 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:39467', name: 0, status: closing, memory: 0, processing: 0>
2022-08-04 07:17:17,085 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:39467
2022-08-04 07:17:17,086 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:36259', name: 1, status: closing, memory: 0, processing: 0>
2022-08-04 07:17:17,086 - distributed.core - INFO - Removing comms to tcp://127.0.0.1:36259
2022-08-04 07:17:17,086 - distributed.scheduler - INFO - Lost all workers
2022-08-04 07:17:17,086 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-ac1856b0-cbc6-4218-bc04-00a8cd84af4f Address tcp://127.0.0.1:39467 Status: Status.closing
2022-08-04 07:17:17,086 - distributed.worker - INFO - Connection to scheduler broken. Closing without reporting. ID: Worker-c6d051b9-5967-4ee7-b55b-ed33b235c74f Address tcp://127.0.0.1:36259 Status: Status.closing
2022-08-04 07:17:17,089 - distributed.scheduler - INFO - Scheduler closing...
https://github.com/dask/distributed/runs/7666946504?check_suite_focus=true#step:11:1316