Conversation
Unit Test Results 16 files ± 0 16 suites ±0 7h 12m 23s ⏱️ - 26m 48s For more details on these errors, see this check. Results for commit 34274b7. ± Comparison against base commit bd3f47e. ♻️ This comment has been updated with latest results. |
|
@sjperkins can I ask you to review this? |
sjperkins
left a comment
There was a problem hiding this comment.
This PR adds logging of state machine events to the Worker. Modified StateMachineEvents are added to a new Worker.stimulus_log attribute. StateMachineEvents can be converted to dictionaries and partly reconstructed from them. This is to support replay from logs discussed for e.g. here #5736 (comment).
I think StateMachineEvent.log could be renamed to something more descriptive
distributed/worker_state_machine.py
Outdated
| def __init_subclass__(cls): | ||
| StateMachineEvent._classes[cls.__name__] = cls | ||
|
|
||
| def log(self, *, handled: float) -> StateMachineEvent: |
There was a problem hiding this comment.
I think this could be named something more descriptive. How about one of the following?
logabble_event?to_loggable_event?
| stimulus_id: str | ||
| #: timestamp of when the event was handled by the worker | ||
| # TODO switch to @dataclass(slots=True) and uncomment (requires Python >=3.10) | ||
| # handled: float | None = field(init=False, default=None) |
There was a problem hiding this comment.
I guess this is the reason for the new method containing the self.handled = None assignment
There was a problem hiding this comment.
yes. clarified in comment
| self.handled = handled | ||
| return self | ||
|
|
||
| def _to_dict(self, *, exclude: Container[str] = ()) -> dict: |
There was a problem hiding this comment.
This dictionary conversion seems necessary because stimulus_log: StateMachineEvent has been added to Worker and thus must be supported by Worker._to_dict
distributed/tests/test_worker.py
Outdated
|
|
||
| prev_handled = story[0].handled | ||
| for ev in story[1:]: | ||
| assert ev.handled > prev_handled |
There was a problem hiding this comment.
Can we always assume that this invariant holds?
There was a problem hiding this comment.
There is a very tiny chance of getting two events in the same nanosecond. changed to >=.
| def log(self, *, handled: float) -> StateMachineEvent: | ||
| out = copy(self) | ||
| out.handled = handled | ||
| out.value = None |
There was a problem hiding this comment.
I understand the execution result is discarded because of the potentially large size of the result, and possibly the complexity of serialising/deserialising the result?
There was a problem hiding this comment.
Not discarding it would cause worker.stimulus_log to become effecitvely a copy of worker.data, except that it never loses any data!
|
|
||
| def _after_from_dict(self) -> None: | ||
| self.value = None | ||
| self.type = None |
There was a problem hiding this comment.
I guess the execution result type is discarded here because it's merely a string representation at this point and one would have to deal with serialising/unserialising types.
In any case, I think reconstructing the result of execution is non-trivial. How does this impact replayability of events on the Worker (out of interest?)
There was a problem hiding this comment.
these fields that are being discarded on a serialization round-trip should be inconsequential for the purpose of rebuilding the state.
|
All review comments have been addressed |
|
Thank you for the work @crusaderky and for the review @sjperkins |
ensure_computingtransitions to newWorkerStateevent mechanism #5895