Conversation
distributed/worker.py
Outdated
| ts = self.tasks[k] | ||
| recommendations[ts] = tuple(msg.values()) | ||
| raise | ||
| return GatherDepDoneEvent(stimulus_id=stimulus_id) |
There was a problem hiding this comment.
The exception is now shielded from @log_errors. Previously it was double reported as it is already logged by logger.exceptions(e) on line 3117.
distributed/worker.py
Outdated
| self.periodic_callbacks[ | ||
| "find-missing" | ||
| ].callback_time = self.periodic_callbacks["heartbeat"].callback_time | ||
| self.ensure_communicating() |
There was a problem hiding this comment.
This is no longer necessary as _ensure_communicating is now in all transitions to fetch.
|
@fjetter you may give it a look now or wait until after I've fixed the regressions |
|
Current status: 8 tests failing. Pending investigation. FAILED distributed/dashboard/tests/test_worker_bokeh.py::test_basicThis is because FAILED distributed/diagnostics/tests/test_eventstream.py::test_eventstreamFAILED distributed/tests/test_scheduler.py::test_decide_worker_coschedule_order_neighbors[nthreads1-1]FAILED distributed/tests/test_worker.py::test_gather_many_smallThis looks like a genuine regression in _select_keys_for_gather - concurrent fetches are being limited to 1 key per worker FAILED distributed/tests/test_worker.py::test_acquire_replicas_already_in_flightFAILED distributed/tests/test_worker.py::test_missing_released_zombie_tasks_2FAILED distributed/tests/test_worker.py::test_gather_dep_cancelled_rescheduledFAILED distributed/tests/test_worker.py::test_gather_dep_do_not_handle_response_of_not_requested_tasks |
df6f189 to
738f7c6
Compare
361b734 to
7569dd8
Compare
distributed/worker.py
Outdated
| # compute-task or acquire-replicas command from the scheduler, it allows | ||
| # clustering the transfers into less GatherDep instructions; see | ||
| # _select_keys_for_gather(). | ||
| return {}, [EnsureCommunicatingLater(stimulus_id=stimulus_id)] |
There was a problem hiding this comment.
The alternative to this was to delete _select_keys_for_gather and either
- Add logic to
_handle_instructionsto squash individual GatherDep instructions on the same worker - Implement no grouping whatsoever and just rely on rpc pooling (needs performance testing)
Both systems would remove the out-of-priority fetch from workers and imply revisiting the limits for concurrent connections.
Either way it would be a major functional change so I opted for this somewhat dirty hack instead which is functionally identical to main.
| return [ev for ev in self.stimulus_log if getattr(ev, "key", None) in keys] | ||
|
|
||
| def ensure_communicating(self) -> None: | ||
| def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: |
There was a problem hiding this comment.
Fetching the stimulus_id from outside means that now gather_dep commands will have the stimulus_id of the event that triggered them, e.g.
- compute-task
- acquire-replicas
- find_missing
- GatherDepDoneEvent
| cancelled_keys: set[str] = set() | ||
|
|
||
| def done_event(): | ||
| return GatherDepDoneEvent(stimulus_id=f"gather-dep-done-{time()}") |
There was a problem hiding this comment.
Temp hack, to be removed when refactoring gather_dep
| self.ensure_communicating() | ||
| self.handle_stimulus( | ||
| GatherDepDoneEvent(stimulus_id=f"readd-busy-worker-{time()}") | ||
| ) |
There was a problem hiding this comment.
I'll change this method in a later PR to an async instruction
There was a problem hiding this comment.
Pedantic, but it feels weird to issue a GatherDepDoneEvent when that isn't actually what happened. Something like
class BusyWorkerReAddedEvent(GatherDepDoneEvent):
passmight make it clearer that they're different things, just happen to be handled in the same way (for now).
But if GatherDepDoneEvent is itself a temporary hack and will be removed soon, then this isn't important.
|
All tests pass! This is ready for review and merge 🥳 |
gjoseph92
left a comment
There was a problem hiding this comment.
Some naming and design questions, but overall this seems good.
distributed/worker.py
Outdated
| stimulus_id=inst.stimulus_id | ||
| ) | ||
| self.transitions(recs, stimulus_id=inst.stimulus_id) | ||
| self._handle_instructions(instructions) |
There was a problem hiding this comment.
Why do we recurse into _handle_instructions here, instead of adding the new instructions onto the end of the current instructions list (in a safe way)? I'm wondering why the new instructions are treated as "higher priority" than the current ones.
There was a problem hiding this comment.
I overhauled the method, please have another look
distributed/worker_state_machine.py
Outdated
|
|
||
|
|
||
| @dataclass | ||
| class EnsureCommunicatingLater(Instruction): |
There was a problem hiding this comment.
I find the "later" part of EnsureCommunicatingLater a little confusing. EnsureCommunicatingOnce? EnsureCommunicatingIdempotent?
AFAIU the point of doing this as an instruction (instead of calling _ensure_communicating directly in many places) is to allow batching of multiple EnsureCommunicating instructions into one, via special logic in _handle_instructions.
There was a problem hiding this comment.
it's not just a matter of doing it once; it must happen after all recommendations to transition to fetch have been enacted.
Renamed to EnsureCommunicatingAfterTransitions.
distributed/worker.py
Outdated
| stimulus_id=stimulus_id, | ||
| # Note: given n tasks that must be fetched from the same worker, this method | ||
| # may generate anywhere between 1 and n GatherDep instructions, as multiple | ||
| # tasks may be clustered in the same instruction by _select_keys_for_gather |
There was a problem hiding this comment.
| # tasks may be clustered in the same instruction by _select_keys_for_gather | |
| # tasks may be clustered in the same instruction by _select_keys_for_gather. | |
| # The number will be greater than 1 when the tasks don't all fit in `target_message_size`. |
This just took me a few reads to make sense of
There was a problem hiding this comment.
It was incorrect to begin with - you'll never have more than one GatherDep from the same worker within the same iteration of ensure_communicating. I rewrote it.
| self.ensure_communicating() | ||
| self.handle_stimulus( | ||
| GatherDepDoneEvent(stimulus_id=f"readd-busy-worker-{time()}") | ||
| ) |
There was a problem hiding this comment.
Pedantic, but it feels weird to issue a GatherDepDoneEvent when that isn't actually what happened. Something like
class BusyWorkerReAddedEvent(GatherDepDoneEvent):
passmight make it clearer that they're different things, just happen to be handled in the same way (for now).
But if GatherDepDoneEvent is itself a temporary hack and will be removed soon, then this isn't important.
distributed/worker_state_machine.py
Outdated
|
|
||
|
|
||
| @dataclass | ||
| class EnsureCommunicatingLater(Instruction): |
There was a problem hiding this comment.
I also find it a little odd that unlike other instructions, EnsureCommunicatingLater doesn't contain any data. It's relying on the side effect of _add_to_data_needed having already mutated data_needed and data_needed_per_worker, but the instruction itself is pointless without that side effect having occurred. I can't think of a cleaner design that avoids this and still has batching though.
There was a problem hiding this comment.
I agree, but the whole instruction is a hack to begin with
gjoseph92
left a comment
There was a problem hiding this comment.
The renaming and _handle_instructions refactor helped, thank you. Just some comment-rewording and type annotations for clarity.
| ) | ||
|
|
||
| self.comm_nbytes += total_nbytes | ||
| self.in_flight_workers[worker] = to_gather |
There was a problem hiding this comment.
| self.in_flight_workers[worker] = to_gather | |
| assert worker not in self.in_flight_workers, self.in_flight_workers[worker] | |
| self.in_flight_workers[worker] = to_gather |
Are we guaranteed that in_flight_workers[worker] is not already set? Because we'd be overwriting it if it is.
EDIT: I think we are because of the if w not in self.in_flight_workers above. Still might be nice to validate though? If this was not the case, it could probably cause a deadlock.
There was a problem hiding this comment.
yes, it's impossible to be there already due to the line you mentioned just above. I think validation should be overkill since it's directly above
| # 1. there are many fetches queued because all workers are in flight | ||
| # 2. a single compute-task or acquire-replicas command just sent | ||
| # many dependencies to fetch at once. | ||
| ensure_communicating = inst |
There was a problem hiding this comment.
Is it even necessary to store the instruction right now (since it's just a sentinel), or could this just be a bool?
There was a problem hiding this comment.
we need the stimulus_id
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Partially closes #5896
In scope for this PR
ensure_communicating() -> Noneto_ensure_communicating() -> RecsInstrself.loop.add_callback(self.gather_dep, ...)ensure_communicatingis no longer called periodically "just in case" - neither fromevery_cyclenor fromfind_missingOut of scope for this PR, but in scope for #5896
gather_depGatherDepDoneEvent(introduced in this PR)_readd_busy_workeras an async instructionOut of scope for #5896
find_missingEnsureCommunicatingLater(introduced in this PR) and_select_keys_for_gather