transition flight to missing if no who_has#5653
Conversation
|
This transition is only happening because there are duplicates in The entire mocking is there because we cannot hook into any worker internals naturally or intercept any stimuli, etc. I think if we were able to structure the internals a bit such that we can intercept/stop the worker at a few key points this would go a long way (e.g. the mocking is only there to disable the @gen_cluster(client=True)
async def test_dups_in_pending_data_per_worker(c, s, a, b):
# We need to fetch data which is reliably in the select_from_gather
# if it is somehow prioriticed it will immediately be flagged as missing
# pending_data_per_worker is outdated because removal is not easy
futs = c.map(inc, range(100), workers=[a.address])
missing_fut = c.submit(inc, -1, workers=[a.address], key="culprit")
await c.gather(futs)
await missing_fut
patcher = mock.patch.object(Worker, "ensure_communicating", return_value=None)
with mock.patch.object(
Worker, "ensure_communicating", return_value=None
) as comm_mock:
with mock.patch.object(
Worker, "ensure_computing", return_value=None
) as comp_mock:
x = await Worker(s.address, name=2, validate=True)
f1 = c.submit(sum, [*futs[:50]], workers=[x.address], key="f1", priority=1)
f2 = c.submit(inc, missing_fut, workers=[x.address], key="f2", priority=2)
f3 = c.submit(sum, [*futs[50:]], workers=[x.address], key="f3", priority=3)
while not len(x.data_needed) == len(futs) + 1:
await asyncio.sleep(0.01)
assert missing_fut.key in x.tasks
assert missing_fut.key in x.pending_data_per_worker[a.address]
for _ in range(3):
await x.query_who_has(missing_fut.key, stimulus_id="foo")
assert (
sum(1 for key in x.pending_data_per_worker[a.address] if key == "culprit") > 1
)
with mock.patch.object(Worker, "query_who_has", return_value=None) as who_has_mock:
# Now create a copy on B of culprit such that it isn't rescheduled once we
# release it
f_copy = c.submit(
inc, missing_fut, key="copy-intention-culprit", workers=[b.address]
)
await f_copy
a.handle_remove_replicas([missing_fut.key], stimulus_id="test")
x.total_out_connections = 1
x.target_message_size = sum(x.tasks[f.key].get_nbytes() for f in futs[:52])
x.ensure_communicating()
with mock.patch.object(
Worker, "ensure_communicating", return_value=None
) as comm_mock:
with mock.patch.object(
Worker, "ensure_computing", return_value=None
) as comp_mock:
while not x.data:
await asyncio.sleep(0.01)
assert not x.tasks["culprit"].who_has
assert x.tasks["culprit"].state == "fetch"
x.target_message_size = 1000000000
# Of course we do not want the assertionerror but it does get swallowed
# if executed as part of a tornado coro and we will only see the timeout
# below
with pytest.raises(AssertionError):
x.ensure_communicating()
# await f1
# await f2
# await f3 |
|
@fjetter I'm not sure I understand:
|
7bed501 to
d380604
Compare
|
this thing escalated a bit out of control. There were multiple issues and my fixes are a little more involved than I was hoping. I do believe that the code is in a much better state now.
|
| dep_ts = worker.tasks[dep_key] | ||
| assert dep_ts.state == expected_state, (worker.name, dep_ts, expected_state) | ||
| assert set(expected) == set(worker.tasks) | ||
| async def assert_task_states_on_worker(expected, worker): |
There was a problem hiding this comment.
I noticed these tests to be mildly flaky and allowed for a retry. Eventually the workers must reach an equilibrium or otherwise fail
| async def test_missing_released_zombie_tasks_2(c, s, a, b): | ||
| a.total_in_connections = 0 | ||
| f1 = c.submit(inc, 1, key="f1", workers=[a.address]) | ||
| f2 = c.submit(inc, f1, key="f2", workers=[b.address]) | ||
| # If get_data_from_worker raises this will suggest a dead worker to B and it | ||
| # will transition the task to missing. We want to make sure that a missing | ||
| # task is properly released and not left as a zombie | ||
| with mock.patch.object( | ||
| distributed.worker, | ||
| "get_data_from_worker", | ||
| side_effect=CommClosedError, | ||
| ): | ||
| f1 = c.submit(inc, 1, key="f1", workers=[a.address]) | ||
| f2 = c.submit(inc, f1, key="f2", workers=[b.address]) | ||
|
|
||
| while f1.key not in b.tasks: | ||
| await asyncio.sleep(0) | ||
| while f1.key not in b.tasks: | ||
| await asyncio.sleep(0) | ||
|
|
||
| ts = b.tasks[f1.key] | ||
| assert ts.state == "fetch" | ||
| ts = b.tasks[f1.key] | ||
| assert ts.state == "fetch" | ||
|
|
||
| # A few things can happen to clear who_has. The dominant process is upon | ||
| # connection failure to a worker. Regardless of how the set was cleared, the | ||
| # task will be transitioned to missing where the worker is trying to | ||
| # reaquire this information from the scheduler. While this is happening on | ||
| # worker side, the tasks are released and we want to ensure that no dangling | ||
| # zombie tasks are left on the worker | ||
| ts.who_has.clear() | ||
| while not ts.state == "missing": | ||
| # If we sleep for a longer time, the worker will spin into an | ||
| # endless loop of asking the scheduler who_has and trying to connect | ||
| # to A | ||
| await asyncio.sleep(0) | ||
|
|
||
| del f1, f2 | ||
| del f1, f2 | ||
|
|
||
| while b.tasks: | ||
| await asyncio.sleep(0.01) | ||
| while b.tasks: | ||
| await asyncio.sleep(0.01) | ||
|
|
||
| assert_worker_story( | ||
| b.story(ts), | ||
| [("f1", "missing", "released", "released", {"f1": "forgotten"})], | ||
| ) | ||
| assert_worker_story( | ||
| b.story(ts), | ||
| [("f1", "missing", "released", "released", {"f1": "forgotten"})], | ||
| ) |
There was a problem hiding this comment.
This test introduced an impossible transition. There is no way for a task in state fetch to empty who_has and the asserted story was false. I introduced the commclosederror above to simulate a more realistic examlpe
|
I think there is a minor problem: a fetched TaskState's priority is inherited from its dependent. So if you schedule a low-priority task with a dependency and, before the dependencies have been fetched, a high-priority task with the same dependency, the fetch priority won't be bumped up. e.g. f1 = c.submit(inc, 0, workers=[a.address])
f2 = c.submit(numpy.zeros, 2**28, workers=[a.address]) # 2 GiB
await wait([f1, f2])
f3 = c.submit(inc, f1, priority=-1, key="low", workers=[b.address])
f4 = c.submit(inc, f2, priority=0, key="mid", workers=[b.address])
f5 = c.submit(inc, f1, priority=1, key="high", workers=[b.address])If I understand everything correctly, in the above example f5 may not run until after f2 has been transferred. |
Indeed, this is an "unsolved" problem. I am aware of this and this has been a problem ever since I introduced the data_needed heap. My gut feeling was that this will only impact workflows rarely. I'm not entirely sure how valuable the reprioritization is and how costly the heap replace would be. I have a few ideas how to do this "efficiently" but I'm inclined to defer this to a later PR. I'm open to documenting this shortcoming somewhere, how about in |
commit 14bddba1482de035ca416368f918d94b115d3660 Merge: df079fc 9a66b71 Author: crusaderky <crusaderky@gmail.com> Date: Tue Jan 25 17:16:58 2022 +0000 Merge branch 'main' into AMM/RetireWorker commit df079fc Merge: 9676934 5835ce3 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 24 15:08:51 2022 +0000 Merge branch 'AMM/staging' into AMM/RetireWorker commit 5835ce3 Merge: ee68b02 b0b8e95 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 24 15:07:27 2022 +0000 Merge branch 'AMM/test_close_gracefully' into AMM/staging commit ee68b02 Merge: 7d4e6ee ccad288 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 24 15:06:50 2022 +0000 Merge branch 'main' into AMM/staging commit b0b8e95 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 24 14:32:29 2022 +0000 Code review commit 9676934 Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 21 14:50:53 2022 +0000 AMM to manage retire_workers() commit 7d4e6ee Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 21 14:39:41 2022 +0000 Fix flaky test_close_gracefully and test_lifetime (dask#5677) commit 7faab51 Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 21 14:39:41 2022 +0000 harden test commit aef3b71 Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 21 14:29:10 2022 +0000 Increase resilience on slow CI commit af84e40 Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 21 12:39:18 2022 +0000 Dump cluster state on all test failures (dask#5674) commit 5054c19 Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 21 12:38:38 2022 +0000 Paused workers shouldn't steal tasks (dask#5665) commit eadb35f Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 21 12:37:48 2022 +0000 transition flight to missing if no who_has (dask#5653) commit 581aee8 Merge: 940bb45 60c0d60 Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 21 12:36:09 2022 +0000 Merge branch 'main' into AMM/test_close_gracefully commit 940bb45 Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 21 12:20:10 2022 +0000 tweak comment commit 731d132 Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 21 12:12:03 2022 +0000 Fix flaky test_close_gracefully and test_lifetime
Yes I agree it's minor - it only impacts users who have a bottleneck in network comms. Happy to leave it to a later PR. |
Co-authored-by: crusaderky <crusaderky@gmail.com>
There is an edge case connected to our
select_keys_for_gatheroptimization for tasks which do no longer have a who_has. Letting the fetch->flight transition redirect this is the most straightforward solution to the problem.A different approach would be to transition tasks to missing as soon as they no longer have a
who_has. It is similar in complexity and I might change this again. I first want to write a test reproducing the mentioned edge case first.#5381 (comment)
@crusaderky you may cherry-pick this commit in the meantime to test your branch. the changes will be rather minimal either way