Redesign worker exponential backoff on busy-gather#6173
Redesign worker exponential backoff on busy-gather#6173crusaderky merged 5 commits intodask:mainfrom
Conversation
| ts.done = False | ||
| self.data_needed.push(ts) | ||
| for w in ts.who_has: | ||
| self.data_needed_per_worker[w].push(ts) |
There was a problem hiding this comment.
Fixes bug where tasks transitioning from missing to fetch would not be picked up by select_keys_for_gather
| while L: | ||
| ts = L.pop() | ||
| while tasks: | ||
| ts = tasks.peek() |
There was a problem hiding this comment.
Fixes bug where
- a task would make an iteration of
select_keys_for_gatherexceeds total_bytes - before the fetch from that worker is complete, another task with higher priority is added to data_needed on the same worker
- at the next iteration of
ensure_communicating, the task is not picked up byselect_keys_for_gather
distributed/worker.py
Outdated
| # Avoid hammering the worker. If there are multiple replicas | ||
| # available, immediately try fetching from a different worker. | ||
| self.busy_workers.add(worker) | ||
| self.io_loop.call_later(0.15, self._readd_busy_worker, worker) |
There was a problem hiding this comment.
To be replaced with an async instruction within the scope of #5896
distributed/worker.py
Outdated
| who_has = await retry_operation( | ||
| self.scheduler.who_has, keys=refresh_who_has | ||
| ) | ||
| self.update_who_has(who_has) |
There was a problem hiding this comment.
Notably this query to the scheduler does not happen if all workers that are known to hold a replica are in flight. I suppose that this disparity of treatment is because this worker knows that workers will eventually exit in_flight_workers, while it has no control over busy_workers.
Unit Test Results 16 files ± 0 16 suites ±0 7h 27m 28s ⏱️ - 1m 18s For more details on these errors, see this check. Results for commit 5bffa37. ± Comparison against base commit 370f456. ♻️ This comment has been updated with latest results. |
|
Generally what's here seems sensible to me. However, I'm also not going deeply into the logic. I'm mostly trusting @crusaderky and the tests. I did have a general question though. There are a couple of occasions where you identify and fix a possible bug. Should we invest more time in creating tests for these? |
|
I think that if @fjetter has ample time on Monday for deeper review then it would be good to wait for that. If that's not the case then I'm comfortable merging. |
| )["f"] | ||
| g = c.submit(inc, f, key="g", workers=[a.address]) | ||
| assert await g == 2 | ||
| assert_worker_story(a.story("f"), [("receive-dep", lw.address, {"f"})]) |
There was a problem hiding this comment.
Unrelated to this PR, but a quick note to @fjetter
I'm totally fine with uses of stories like this one. I like this because it is a very focused assertion statement. It's clear that we care about this specific thing, rather than copying down the entire transition log. It's also easier to understand the intent from a reader's perspective as well. I get that we're expecting to receive "f" from lw. If this breaks and I have to come fix it in the future I think that I'll be able to quickly understand the point that it was trying to get across. I also think that it's unlikely to break for unrelated reasons.
There was a problem hiding this comment.
FWIW I think an even better way to assert this would be to assert on incoming/outgoing transfer logs since receive-dep is technically not a transition and only there for 'historic reasons'. Still, I'm Ok with this
Writing tests for |
|
Fine by me |
fjetter
left a comment
There was a problem hiding this comment.
I just had a minor question about a test but that's not a blocker.
| )["f"] | ||
| g = c.submit(inc, f, key="g", workers=[a.address]) | ||
| assert await g == 2 | ||
| assert_worker_story(a.story("f"), [("receive-dep", lw.address, {"f"})]) |
There was a problem hiding this comment.
FWIW I think an even better way to assert this would be to assert on incoming/outgoing transfer logs since receive-dep is technically not a transition and only there for 'historic reasons'. Still, I'm Ok with this
| story = b.story("busy-gather") | ||
| # 1 busy response straight away, followed by 1 retry every 150ms for 500ms. | ||
| # The requests for b and g are clustered together in single messages. | ||
| assert 3 <= len(story) <= 7 |
There was a problem hiding this comment.
what's the motivation for changing the "timeout h" to this?
There was a problem hiding this comment.
I didn't remove the h timeout? It's on line 1836.
There was no count on the number of retries before.
select_keys_for_gatherwould return less keys than it could. Note that I didn't write unit tests for this, as they would be unhealthily complicated to implement now and are best left to after the state machine is broken out of Worker.