Conversation
There was a problem hiding this comment.
I've been thinking about this PR. Here's an example with the stimulus_id on the right after the # comment.
- Client._send_to_scheduler({"op": "update-graph-hlg", "stimulus_id": "CHLG"}) # CHLG
- Scheduler.update_graph_hlg # CHLG
- Scheduler..update_graph # CHLG
- Scheduler.worker_send({"op": "compute-task"}) # CHLG
- Worker.handle_compute_task # CHLG
- Worker.execute # CHLG
- Worker._handle_instruction # "task-finished"
- Worker.batched_stream({"op": "task-finished"}) # "task-finished"
- Scheduler.handle_task_finished # "task-finished"
I've been thinking about this in a Cluster vs a LocalCluster approach.
- Within a Distributed Cluster, the STIMULUS_ID ContextVar will be unset and must derived from a message from an external entity (Client, Scheduler, Worker).
- Within a LocalCluster (Client, Scheduler, Worker in the same process), the STIMULUS_ID may already have been set.
I am considering how to write code to handle both cases and this is complicated by examples such as update_graph_hlg calling update_graph.
I also wonder if a Distributed ContextVar might be possible with the simple use of STIMULUS_ID.set(stimulus_id) at various handler boundaries and copy_context().run at thread boundaries.
I think there's a lot of complexity here (async, threads and inter-process communication) that I'm trying to get my head around. To improve my understanding I would like to model this with some minimal distributed.core.Server's in a test case.
A further thought on managing this complexity would be to add some kwargs to default_stimulus_id such as override (always override existing STIMULUS_ID) and require_empty (require STIMULUS_ID to not be present prior to setting it)
This builds on top of #6046 (or at least a version of it) and moves the set_stimulus_id (former Scheduler.stimulus_id) to the utils.py module and applies the contextvar to the worker as well.
I encountered some problems with the dataclasses we're using. I'm sure this can be ironed out but I didn't want to waste time.