cancelled/resumed->long-running transitions#6916
Conversation
e1206de to
a6557e5
Compare
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files + 3 15 suites +3 6h 32m 52s ⏱️ + 1h 55m 48s For more details on these failures, see this check. Results for commit b9ebe23. ± Comparison against base commit 817ead3. ♻️ This comment has been updated with latest results. |
|
|
||
|
|
||
| @gen_cluster(client=True, nthreads=[("", 1)], timeout=2) | ||
| async def test_secede_cancelled_or_resumed_scheduler(c, s, a): |
There was a problem hiding this comment.
Note that this second test does not test the resumed(fetch) use case. However, the first test above demonstrates that the cancelled and resumed(fetch) use cases are indistinguishable from the scheduler's side.
distributed/worker_state_machine.py
Outdated
| def _transition_cancelled_long_running( | ||
| self, ts: TaskState, compute_duration: float, *, stimulus_id: str | ||
| ) -> RecsInstrs: | ||
| """This transition also serves resumed(fetch) -> long-running""" |
There was a problem hiding this comment.
Not terribly happy about this, but the two alternatives (call it _transition_generic_long_running, when it's definitely not generic, and copy-pasting the whole thing to a _transition_resumed_long_running) seemed worse.
There was a problem hiding this comment.
_transition_cancelled_or_resumed_long_running? I'd be happier with a verbose but accurate name.
gjoseph92
left a comment
There was a problem hiding this comment.
I see how this probably the only way to solve the problem, but it feels a little odd to me.
What if we just didn't allow cancelled tasks to secede? While secede is running, we control the thread—we could just raise an error, and refuse to call tpe_secede() or submit the SecedeEvent. So cancelled->long_running would remain an impossible transition. If the exception causes the task to fail, we ignore it anyway. If the user code decides to handle the exception in some way, that's fine, but they'll still never be able to trigger a cancelled->long_running transition, since secede would refuse to do it.
In most cases, we can't cancel the running thread. But secede is the rare case where we do have the opportunity. Seems like it would be simpler to just not have to worry about this transition?
distributed/worker_state_machine.py
Outdated
| def _transition_cancelled_long_running( | ||
| self, ts: TaskState, compute_duration: float, *, stimulus_id: str | ||
| ) -> RecsInstrs: | ||
| """This transition also serves resumed(fetch) -> long-running""" |
There was a problem hiding this comment.
_transition_cancelled_or_resumed_long_running? I'd be happier with a verbose but accurate name.
distributed/worker_state_machine.py
Outdated
| self.executing.discard(ts) | ||
| self.long_running.add(ts) | ||
|
|
||
| # Do not send LongRunningMsg |
There was a problem hiding this comment.
To clarify: the idea is that we don't send the message right now, because the task is cancelled, so from the scheduler's perspective, it's not running anymore on this worker, therefore the scheduler shouldn't hear updates from this worker about that task (xref #6956).
Instead, we postpone sending the LongRunningMsg until the task is un-cancelled. Only then will we send the message, since we know it's relevant.
This seems worth a longer comment?
There was a problem hiding this comment.
Overhauled comment
| assert ws.processing | ||
|
|
||
| await ev4.set() | ||
| assert await x == 123 |
There was a problem hiding this comment.
Wait, so the expected, correct behavior is that you release a future, submit a new future with the same key, and get back the old (cancelled) future's result instead of the new one? That seems pretty wrong to me.
I'm aware that this could happen even for normal tasks, not just long-running, and it's just a consequence of not cancelling the thread, and keeping the TaskState around until the thread finishes. But from an API and user perspective, that seems wrong. I didn't think keys needed to be unique over the lifetime of the cluster, just that they needed to be unique among all the currently-active keys (and once a client saw a key as released, then it could safely consider it inactive).
There was a problem hiding this comment.
Yep, but this is how it works. I spent several weeks trying and failing to make it become sensible: #6844
This is a pretty rare use case: a user submits a task with a manually-defined key; then before the task has had the time to finish, it submits a different task with the same key.
Honestly, I feel that the blame should sit on the user entirely here, and figuring out what went wrong should be pretty straightforward. It also should not really happen except when prototyping from a notebook, unless there are key collisions which will cause all sort of weird behaviour anyway.
This makes me very nervous, because |
|
All review comments have been addressed |
gjoseph92
left a comment
There was a problem hiding this comment.
Discussed offline. Though using sync to run a function on the event loop in secede (and raising an error if the task was already cancelled) would be possible and avoid threading race conditions, it would be difficult to test thoroughly. The benefit of secede raising an error if cancelled is also probably small, as most tasks would call secede right away, so there'd be very little time for the task to be cancelled in between. The state-machine-based approach here is much easier to test, so we'll go with this.
2ce602e to
b9ebe23
Compare
Closes #6709