Fix resource allocation for tasks with dependencies#6676
Fix resource allocation for tasks with dependencies#6676crusaderky merged 25 commits intodask:mainfrom
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 5h 48m 12s ⏱️ - 47m 16s For more details on these failures and errors, see this check. Results for commit 086178b. ± Comparison against base commit 40867c7. ♻️ This comment has been updated with latest results. |
6a589a5 to
e238af1
Compare
distributed/worker_state_machine.py
Outdated
| assert next_state in {"waiting", "fetch"}, next_state | ||
| assert ts._previous in {"executing", "flight"}, ts._previous | ||
|
|
||
| if ts._previous == "executing": |
There was a problem hiding this comment.
TODO: Write test that ensures that we do in fact release IFF the task is finished.
There was a problem hiding this comment.
This and line 1888 are missing the use case of 'long-running'. I'll open a follow-up PR about it, no point adding scope creep here.
I'd like to have the test you mention within this PR please.
There was a problem hiding this comment.
ts._previous should never happen to begin with in this method. I'll clean up the code in here in a separate PR.
472f62b to
60b0fbb
Compare
|
Please merge from main. Also there are failing tests |
2796a56 to
25e2e01
Compare
| assert ws.available_resources == {"R": 1} | ||
|
|
||
|
|
||
| @pytest.mark.xfail(reason="distributed#6565") |
There was a problem hiding this comment.
XFAIL to be tackled in a follow-up PR. #6565 is already tracking this issue.
There was a problem hiding this comment.
Good catch, somehow missed that.
| assert ws.available_resources == {"R": 1} | ||
|
|
||
|
|
||
| @pytest.mark.xfail(reason="distributed#6682") |
There was a problem hiding this comment.
XFAIL to be tackled in a follow-up PR. #6682 has been created to tackle this issue.
|
Failing tests are known flakes. |
| stimulus_id="compute", | ||
| ) | ||
| ) | ||
| assert ws.tasks["x"].state == "resumed" |
There was a problem hiding this comment.
Could you review this?
From reading the code (see _transition_from_resumed), "resumed" should be exclusively on one of the following loops:
- executing or long-running -> cancelled -> fetch
- flight -> cancelled -> waiting
in the executing -> cancelled -> waiting loop that you implemented here, I expect ts.state to be 'executing'.
There was a problem hiding this comment.
This test does not create an executing -> cancelled -> waiting loop, but an executing -> cancelled -> fetch loop by cancelling x and then gathering it as a dependency to y.
|
It's probably a good idea to park this momentarily and write two separate PRs,
|
fe6371d to
3ff3354
Compare
| SecedeEvent(key="x", compute_duration=1.0, stimulus_id="secede") | ||
| ) | ||
| assert ws.tasks["x"].state == request.param | ||
| assert ws.available_resources == {"R": 0} |
There was a problem hiding this comment.
Moved this into the dedicated test_ws_with_running_task
|
|
||
|
|
||
| @pytest.mark.parametrize("state", ["executing", "long-running"]) | ||
| def test_running_constrained_task_acquires_resources(state, ws): |
There was a problem hiding this comment.
Duplicating logic from ws_with_running_task and test_ws_with_running_task to have an explicit test focused on resource restrictions that is resilient to changes to those functions.
|
CI issues are being caused by #6692. I'd move |
…sk to _transition_waiting_ready
Co-authored-by: crusaderky <crusaderky@gmail.com>
Co-authored-by: crusaderky <crusaderky@gmail.com>
c09261a to
2dacf7f
Compare
| assert ws.available_resources == {"R": 1} | ||
|
|
||
| instructions = ws.handle_stimulus( | ||
| GatherDepSuccessEvent("gather-dep-done", "127.0.0.1:1235", 8, {"x": 1.0}) |
There was a problem hiding this comment.
This is unreadable, please never go full-positional
| if ts._previous in ("executing", "long-running"): | ||
| self._release_resources(ts) | ||
| self.executing.discard(ts) | ||
| self.long_running.discard(ts) |
There was a problem hiding this comment.
This is actually superfluous as we're invoking purge_state afterwards. But I think it's a good idea to have it for cleanliness' sake.
|
See code review: 74defe4 The culprits are, again, #6689 + #6693. I'm going to merge the PR as-is, but it means it doesn't close #6663. |
|
@crusaderky: Thanks for the thorough review! |
Closes #6663
Blocked by
Refactor resource restriction handling inWorkerState#6672Adddummyfactory methods forExecuteSuccessEventandExecuteFailureEvent#6687AddWorkerState.all_running_tasks#6690Revisit WorkerState.long_running set #6697pre-commit run --all-files