Skip to content

Fix decide_worker picking a closing worker#8032

Merged
fjetter merged 7 commits intodask:mainfrom
crusaderky:closing_worker
Aug 3, 2023
Merged

Fix decide_worker picking a closing worker#8032
fjetter merged 7 commits intodask:mainfrom
crusaderky:closing_worker

Conversation

@crusaderky
Copy link
Copy Markdown
Collaborator

candidates = set(all_workers)
else:
candidates = {wws for dts in ts.dependencies for wws in dts.who_has}
candidates &= all_workers
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes #8019

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think this is a situation where an actual decide_worker unit test would be appropriate

@crusaderky crusaderky self-assigned this Jul 24, 2023
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Jul 24, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       20 files  ±       0         20 suites  ±0   11h 58m 7s ⏱️ + 1h 37m 24s
  3 751 tests +       3    3 638 ✔️ +       9     106 💤  -   2    7  - 2 
36 284 runs  +2 004  34 524 ✔️ +1 951  1 748 💤 +58  12  - 3 

For more details on these failures, see this check.

Results for commit 80b36c9. ± Comparison against base commit a7f7764.

This pull request removes 5 and adds 8 tests. Note that renamed tests count towards both.
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---nanny]
distributed.cli.tests.test_dask_worker.test_listen_address_ipv6[tcp:..[ ‑ 1]:---no-nanny]
distributed.tests.test_failed_workers ‑ test_submit_after_failed_worker_async[False]
distributed.tests.test_failed_workers ‑ test_submit_after_failed_worker_async[True]
pytest ‑ internal
distributed.tests.test_failed_workers ‑ test_submit_after_failed_worker_async[False-False-closed]
distributed.tests.test_failed_workers ‑ test_submit_after_failed_worker_async[False-False-closing]
distributed.tests.test_failed_workers ‑ test_submit_after_failed_worker_async[False-True-closed]
distributed.tests.test_failed_workers ‑ test_submit_after_failed_worker_async[False-True-closing]
distributed.tests.test_failed_workers ‑ test_submit_after_failed_worker_async[True-False-closed]
distributed.tests.test_failed_workers ‑ test_submit_after_failed_worker_async[True-False-closing]
distributed.tests.test_failed_workers ‑ test_submit_after_failed_worker_async[True-True-closed]
distributed.tests.test_failed_workers ‑ test_submit_after_failed_worker_async[True-True-closing]

♻️ This comment has been updated with latest results.

Comment on lines +82 to +94
in_update_graph = asyncio.Event()

async def update_graph(*args, **kwargs):
in_update_graph.set()
await async_poll_for(
lambda: b_ws.status == Status.closing, timeout=5, period=0
)
s.update_graph(*args, **kwargs)
nonlocal done_update_graph
done_update_graph = True

s.stream_handlers["update-graph"] = update_graph
await in_update_graph.wait()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the problem, I would've expected something like

def block(arg, enter, exit):
    enter.set()
    exit.wait()
    return arg

enter = Event()
exit = Event()
enter2 = Event()
exit2 = Event()

d1 = c.submit(inc, 0, key='d1', workers=["A"])
d2 = c.submit(block, 1, enter=enter, exit=exit, key='d2', workers=["B"])

x = c.submit(sum, [d1, d2], key='x')
block_executor = c.submit(block, None, enter=enter2, exit=exit2, key='x', workers=["B"])

await enter.wait()
await enter2.wait()
await asyncio.gather([
    exit.set(),
    B.close()
])

I haven't tested the above and it may still need fine tuning but I would expect something like this to trigger the condition you're talking about. d2 completes while B is closing s.t. when d2 finishes, x is transitioned while B is still closing.

I don't entirely understand why we need update_graph to trigger this condition

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially considering that update_graph (right now) is still synchronous.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too happy with my test, either. But I don't think your suggestion works (read below).

What we're trying to test:

  • decide_worker_non_rootish(ts) is called on a task with workers=[b.address], allow_other_workers=True,
  • with the dependencies of the task partially on a and partially on b,
  • while b is in closing status, and
  • with decide_worker(ts, valid_workers=set(), all_workers={a}) that would pick b from ts.dependencies[...].who_has, due to less dependency bytes needing to be transferred to it, but instead picks a because b is not in all_workers.

The problem with writing the test is that we need to time update_graph to land exactly during the 1-2 event loop cycles while the worker is in closing status.
The worker transitions from closing to being removed when the batched comms collapse, here:

finally:
if worker in self.stream_comms:
worker_comm.abort()
await self.remove_worker(
worker, stimulus_id=f"handle-worker-cleanup-{time()}"
)

Alternatively to monkey-patching update-graph, I could have

  • monkey-patched Scheduler.remove_worker. In hindsight that's a better idea; I'll have a look at it now.
  • synchronously call update_graph directly on the scheduler and update the state on the client by hand (complicated and brittle).

In your code:

I think that the scheduler will never receive the task-finished message for d2, since it's a whole 2 cycles of event loop after Worker.close collapses the batched comms.

I'm not sure why you think the scheduler should receive {op: task-finished, key: d2} deterministically after {op: worker-status-change, worker: <b>, status: closing}, but deterministically before the collapse of the TCP channel?

Also I can't understand what's the purpose of block_executor.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but deterministically before the collapse of the TCP channel?

I never said it would do so deterministically. I said I would expect something like this to trigger the condition you described. I also said it would require more fine tuning like distributing nbytes properly and maybe introduce an event somewhere.

@crusaderky
Copy link
Copy Markdown
Collaborator Author

I got 2 failures out of 400 runs:

neither seem to be related to this PR.
Ready for review and merge.

@crusaderky crusaderky marked this pull request as ready for review July 26, 2023 14:18
await wait_remove_worker.wait()
return await orig_remove_worker(*args, **kwargs)

monkeypatch.setattr(s, "remove_worker", remove_worker)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK-ish with using monkeypatch here. However, just for the sake of prosperity, there is also a way to use our RPC mechanism more naturally. Essentially you want to intercept the point in time just when a request handler is called. You can make this very explitc

async def new_remove_worker_handler_with_events(self, *args, **kwargs):
    in_remove_worker.set()
    await wait_remove_worker.wait()
    return await self.remove_worker(*args, **kwargs)
s.handlers['unregister'] = new_remove_worker_handler_with_events`

Semantically, this overrides the unregister handler and replaces it with a new handler.
However, in the end, it's the same thing just the way the patch is installed is different.

Copy link
Copy Markdown
Collaborator Author

@crusaderky crusaderky Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not arriving here from the unregister handler. We're arriving from

finally:
if worker in self.stream_comms:
worker_comm.abort()
await self.remove_worker(
worker, stimulus_id=f"handle-worker-cleanup-{time()}"
)

Comment on lines +65 to +67
L = c.map(
inc,
range(10),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need a map for this? This feels much more difficult to control than if we used single tasks with specific placement

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Simplified.

Comment on lines -2223 to -2226
if not self.running:
return None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related? At least the new test doesn't seem to care about this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unreachable because the same condition is already tested on line 2218

candidates = set(all_workers)
else:
candidates = {wws for dts in ts.dependencies for wws in dts.who_has}
candidates &= all_workers
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think this is a situation where an actual decide_worker unit test would be appropriate

@crusaderky
Copy link
Copy Markdown
Collaborator Author

Code review comments have been addressed

@fjetter fjetter merged commit 84e1984 into dask:main Aug 3, 2023
@crusaderky crusaderky deleted the closing_worker branch August 3, 2023 09:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AssertionError in decide_worker_non_rootish CI failing with test_submit_after_failed_worker_async

2 participants