Do not allow for a worker to reject a drop replica request#7490
Do not allow for a worker to reject a drop replica request#7490
Conversation
| (f3.key, "resumed", "released", "cancelled", {}), | ||
| (f3.key, "cancelled", "waiting", "executing", {}), | ||
| (f3.key, "executing", "error", "error", {}), | ||
| # FIXME: (distributed#7489) |
There was a problem hiding this comment.
Instead of accepting the erred task, the scheduler should reject the result and reschedule the computation (#7489)
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 24 files ± 0 24 suites ±0 10h 27m 31s ⏱️ + 26m 45s For more details on these failures, see this check. Results for commit 3a655f1. ± Comparison against base commit fae59c4. ♻️ This comment has been updated with latest results. |
|
The codecov is interesting. There are some indirect code coverage changes reported. Basically |
Might of course just be flaky but there is also a real possibility that this is no longer possible. I was hoping that with the consistencies the run_id provides the cancelled/resumed states would no longer be required. This may be the first glimpse at this |
|
I can confirm that |
|
https://app.codecov.io/gh/dask/distributed/blob/main/distributed/worker_state_machine.py Parts of this transition were already uncovered From what I can tell,
|
|
The last code branch is indeed impossible. It could only trigger if the scheduler asked a worker to compute a task twice w/out any additional intermediate messages |
|
This is a low level test that covers the above branches and shows what is happening and why that is OK. This is effectively the scenario you are describing in #7490 (comment) and I believe this is the only way to trigger this. @pytest.mark.parametrize("secede", [True, False])
def test_compute_free_fetch_compute(ws, secede):
ws2 = "127.0.0.1:2"
instructions = ws.handle_stimulus(ComputeTaskEvent.dummy("x", stimulus_id="s1"))
# Note: A future implementation could also allow the task to be executed
# again Right now, the scheduler should reschedule the task because of wrong
# run_id
if secede:
ws.handle_stimulus(
SecedeEvent(
key="x",
compute_duration=1.0,
stimulus_id=f"secede",
)
)
assert len(instructions) == 1
assert isinstance(instructions[0], Execute)
instructions = ws.handle_stimulus(
# x is released for whatever reasen (e.g. client cancellation)
FreeKeysEvent(keys=["x"], stimulus_id="s2"),
# x was computed somewhere else
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s3"),
# x was lost / no known replicas, therefore y is cancelled
FreeKeysEvent(keys=["y"], stimulus_id="s4"),
ComputeTaskEvent.dummy("x", stimulus_id="s5"),
)
assert len(ws.tasks) == 1
assert ws.tasks["x"].state == "executing" if not secede else "long-running" |
fjetter
left a comment
There was a problem hiding this comment.
good to go once CI is done and green-ish
| def _transition_resumed_waiting( | ||
| self, ts: TaskState, *, stimulus_id: str | ||
| ) -> RecsInstrs: | ||
| """ | ||
| See also | ||
| -------- | ||
| _transition_cancelled_fetch | ||
| _transition_cancelled_or_resumed_long_running | ||
| _transition_cancelled_waiting | ||
| _transition_resumed_fetch | ||
| """ | ||
| # None of the exit events of execute or gather_dep recommend a transition to | ||
| # waiting | ||
| assert not ts.done | ||
| if ts.previous == "executing": | ||
| assert ts.next == "fetch" | ||
| # We're back where we started. We should forget about the entire | ||
| # cancellation attempt | ||
| ts.state = "executing" | ||
| ts.next = None | ||
| ts.previous = None | ||
| return {}, [] | ||
|
|
||
| elif ts.previous == "long-running": | ||
| assert ts.next == "fetch" | ||
| # Same as executing, and in addition send the LongRunningMsg in arrears | ||
| # Note that, if the task seceded before it was cancelled, this will cause | ||
| # the message to be sent twice. | ||
| ts.state = "long-running" | ||
| ts.next = None | ||
| ts.previous = None | ||
| smsg = LongRunningMsg( | ||
| key=ts.key, compute_duration=None, stimulus_id=stimulus_id | ||
| ) | ||
| return {}, [smsg] | ||
|
|
||
| else: | ||
| assert ts.previous == "flight" | ||
| assert ts.next == "waiting" | ||
| return {}, [] | ||
|
|
|
I haven't seen |
fjetter
left a comment
There was a problem hiding this comment.
A couple of nits but the PR can go in
Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>

Supersedes #7487 and finishes it up.
pre-commit run --all-files