Refactor find_missing and refresh_who_has#3
Refactor find_missing and refresh_who_has#3crusaderky wants to merge 7 commits intoWSMR/update_who_hasfrom
Conversation
Unit Test Results 15 files ± 0 15 suites ±0 6h 27m 31s ⏱️ + 5m 38s For more details on these failures, see this check. Results for commit 1bc0283. ± Comparison against base commit e8f5ef5. ♻️ This comment has been updated with latest results. |
37c15b2 to
45dc795
Compare
933c9cf to
9479d54
Compare
45dc795 to
2f96f32
Compare
| def handle_stimulus(self, stim: StateMachineEvent) -> None: | ||
| self.stimulus_log.append(stim.to_loggable(handled=time())) | ||
| if not isinstance(stim, FindMissingEvent): | ||
| self.stimulus_log.append(stim.to_loggable(handled=time())) |
There was a problem hiding this comment.
This is ugly, but post refactor the alternative is for Worker to put its nose directly into the internal data structures of the WorkerState. The alternative would also require the Worker to autonomously realise that something's stuck on the state and query the scheduler accordingly; I don't think it should own this kind of intelligence.
distributed/worker.py
Outdated
| if not workers and ts.state == "fetch": | ||
| recs[ts] = "missing" | ||
| elif workers and ts.state == "missing": | ||
| recs[ts] = "fetch" |
There was a problem hiding this comment.
Note that this could be triggered by any of the 4 events that invoke update_who_has:
- handle_compute_task
- handle_acquire_replicas
- RefreshWhoHasEvent, instigated by find_missing
- RefreshWhoHasEvent, instigated by refresh_who_has
| keys=[ts.key for ts in self._missing_dep_flight], | ||
| stimulus_id=ev.stimulus_id, | ||
| ) | ||
| return {}, [smsg] |
There was a problem hiding this comment.
Again, the only reason for this event - which is always triggered by the worker itself - is to encapsulate this logic away from the Worker and into the WorkerState.
distributed/worker.py
Outdated
| # _ensure_communicating to be a no-op when there's nothing to do. | ||
| instructions.append( | ||
| EnsureCommunicatingAfterTransitions(stimulus_id=ev.stimulus_id) | ||
| ) |
There was a problem hiding this comment.
Note that this is triggered specifically for find_missing and refresh_who_has and does not fix dask#6446
Temporary diff show for dask#6348 vs dask#6342