Conversation
30dba93 to
c229a65
Compare
Unit Test Results 18 files ±0 18 suites ±0 10h 15m 31s ⏱️ + 20m 12s For more details on these failures, see this check. Results for commit 7bf0117. ± Comparison against base commit 2ff681c. ♻️ This comment has been updated with latest results. |
|
Thanks for the comments @fjetter. I think this is ready for further review. One observation I'd like to make is that there are a number of members that follow the following pattern: def do_something(..., stimulus_id=None):
stimulus_id = stimulus_id of f"do-something-{time()}"For example, reschedule: #5954 (comment). These patterns exist to ensure that
|
fjetter
left a comment
There was a problem hiding this comment.
A few minor comments. The biggest thing is that I would like us try my suggestion about keeping the stimulus_id high level and not pass it down to every transition method. That would be really nice, I think. A similar thing could be done on worker side, I believe but I suggest to not change anything about the worker signatures in this PR.
| key, finish, *args, stimulus_id=stimulus_id, **kwargs | ||
| ) | ||
| recommendations, client_msgs, worker_msgs = a | ||
| self.send_all(client_msgs, worker_msgs) |
There was a problem hiding this comment.
I'm wondering if a cleaner approach to this would be to not add stimulus_id to every transition_X_Y method but instead deal with the required mutations here.
the only thing a transition_X_Y method can (should) do with the stimulus_id is to attach it to a worker or client message. However, why don't we attach this to every worker message here and save ourselves these dirty signature?
e.g.
def transition_memory_forgotten(self, key):
...
# This is the only place we're actually using the stim ID. _propagate_forgotten only adds it to the worker_msgs. we can add this on a higher level and don't need to pass it down into every method.
_propagate_forgotten(
self, ts, recommendations, worker_msgs
)
return recommendations, client_msgs, worker_msgsI haven't verified if this works but it would be much less invasive.
Adding the stim ID could be performed as part of send_all where we're iterating over the messages anyhow.
There was a problem hiding this comment.
I had a look at the 15 transitions in SchedulerState, of which 6 take stimulus_id's, while 9 do not. I think the two numbers are close enough that either approach might be ugly and my vote would be for the ugliness of extra stimulus kwargs in the 15 transition functions.
I was thinking about this a bit more and two other approaches occurred to me. Here's the flavour of the first:
import inspect
class TransitionFunction:
def __init__(self, fn):
assert callable(fn)
self.sig = inspect.signature(fn)
def stimulus_in_sig(self):
pass # implement
def __callable__(self, *args, **kw):
if not self.stimulus_in_sig():
kw.pop("stimulus_id", None)
return self.fn(*args, **kw)
return self.fn(*args, **kw)
class SchedulerState:
self._transitions_table = {
("released", "waiting"): TransitionFunction(self.transition_released_waiting),
("waiting", "released"): TransitionFunction(self.transition_waiting_released),
...
("released", "erred"): TransitionFunction(self.transition_released_erred),
} The second idea is not yet fully formed but it might be possible to automatically generate and inject stimulus_id's into the distributed.core.Server class handlers. Then, it might be possible to store the stimulus_id's in ContextVars that can be passed through async/sync call frames and inspected at the point where we need the stimulus_id's. This could be combined with Tensorflow's ideas around variable scoping (e.g. see https://www.tensorflow.org/api_docs/python/tf/name_scope)
It also might be possible to track the call frames inspect.currentframe() back to distributed.core.Server handlers and automatically derive stimulus_id's.
I think the nice thing about this approach is it discard's the need to generate and pass stimulus_id's around the code base -- one could simply retrieve an appropriately generated stimulus. On the other hand, the logic might be too complicated and magical.
Pinging @graingert in case there's some problem with this approach and I spend to much time down this rabbithole.
There was a problem hiding this comment.
Stated in a simpler way, I'm thinking of an ExitStack-like construct which
- Would be automatically initialised with supplied or generated stimulus_id's at the Server handlers
- Supports overriding of existing stimulus_id's throughout Server sub-classes.
- Is safe for use with asyncio (I think contextvars gives us this).
There was a problem hiding this comment.
Stated in a simpler way, I'm thinking of an ExitStack-like construct which
Looks like AsyncExitStack is a possibility here.
|
Closed in favour of #6095 |
pre-commit run --all-files