Ensure resumed tasks are not accidentally forgotten#6217
Conversation
768a3a6 to
b2fcca3
Compare
|
The windows test failure is a known issue |
Unit Test Results 16 files ± 0 16 suites ±0 7h 23m 52s ⏱️ - 9m 40s For more details on these failures, see this check. Results for commit 1e23e1e. ± Comparison against base commit 2286896. ♻️ This comment has been updated with latest results. |
|
I took a brief look at this. No objection from me, but I didn't dive deeply into the logic. If tests pass and you're feeling confident about the added value @fjetter I think that it's ok to merge. If you can get someone like @crusaderky or @gjoseph92 to take a look that would be better of course. |
gjoseph92
left a comment
There was a problem hiding this comment.
I've never understood the flow for cancelled and released tasks, so I don't think I'm in a good position to review this. I find these states incredibly confusing. I'm looking forward to this hopefully making more sense with #5895.
| if not ts.done: | ||
| ts.state = "cancelled" | ||
| ts._next = None | ||
| return {}, [] |
There was a problem hiding this comment.
What's going to eventually pick this up and move it out of cancelled if there are no recommendations and no next?
There was a problem hiding this comment.
TLDR Once ts.done = True is set, i.e. execute/flight is done
ts._next should never have been set for cancelled. When I implemented cancelled/resumed states I made a few mistakes. The only relevant next state for cancelled is released. That's the entire point of the state. The worker was instructed to release a key but it can't because it is "stuck" waiting for something to finish, i.e. either the execution thread or the gather_data coroutine.
Once execution/gather finishes, they'll recommend a transition, e.g. upon success they'll recommend a transition to memory. For example cancelled->memory will ensure the key is released.
Why is this logic not directly implemented as part of the gather_dep/execute result parser? Well, have a look at the code there. Particularly the gather_dep result parser/finally clause is the most frequent source of deadlocks because the logic just blows up.
There is a bit of a design philosophy behind this to break a big, complex decision up into many small decisions that can be made using local context information.
Consider the following example
- T1 was instructed to be computed
- T1 is dispatched to the threadpool
- T1 is requested to be released
- T1 finishes
The result, i.e. once it finishes could be implemented as
if result == "success":
if ts.not_cancelled:
put_key_in_memory()
else:
release_key()
else:
result == "failed"
if ts.not_cancelled:
if ts.asked_to_be_fetched_instead:
# Whether this is a valid thing for the scheduler to ask is out
# of scope for this comment. It happens/happened
reschedule_to_fetch_key()
else:
put_key_in_memory()
else:
release_key()With this transition system, it instead becomes
# executing result parser
# This only requires local context, the decision should be simple, straight forward
if result == "success":
recommend_memory()
else:
assert result == "success"
recommend_error()
def transition_cancelled_error(...):
assert stuff
release_task()
def transition_cancelled_memory(...):
assert stuff
put_key_in_memory()This decision tree is a bit more complex for gathering keys. I'm not 100% convinced anymore if this is the right approach but here we are right now. The recent refactoring will allow us mid-term to move away from this if we choose to do so.
|
I believe at least some of the test failures relate to #5910 |
70cb386 to
332fdb1
Compare
distributed/worker.py
Outdated
| # We'll ignore instructions, i.e. we choose to not submit the failure | ||
| # message to the scheduler since from the schedulers POV it already | ||
| # released this task | ||
| recs, _ = self.transition_executing_error( |
There was a problem hiding this comment.
All transitions from executing call ensure_computing.
This should deadlock the worker if there are any tasks in ready state.
There was a problem hiding this comment.
I added a test for this. This does not deadlock since the transition generates a recommendation. Only after acting on that recommendation we'll get an instruction.
| # Queue up another task to ensure this is not affected by our error handling | ||
| fut2 = c.submit(inc, 1) | ||
| await wait_for_state(fut2.key, "ready", w) |
There was a problem hiding this comment.
@crusaderky this triggers the condition you are concerned about in the transition function about dropped instructions.
The task is queued up and we'll receive a recommendation. The only instruction at that point is the TaskErred message.
|
I think there may be a regression here: #6305 (comment) |
closes #6194
pre-commit run --all-filesThree changes in this PR
ts._nextis not set for cancelled tasks. Cancelled tasks should always transition to released once they are done.