Add worker task processing on if present#2622
Conversation
distributed/scheduler.py
Outdated
| for plugin in list(self.plugins): | ||
| try: | ||
| if self.tasks[key].processing_on is not None: | ||
| kwargs['worker'] = self.tasks[key].processing_on.address |
There was a problem hiding this comment.
Thanks @nicolls1 !
Perhaps instead we might just place in the entire task state object, and let the plugin do what it wants with it. Thoughts?
There was a problem hiding this comment.
To be clear, I'm suggesting something like kwargs['task_state'] = ts
There was a problem hiding this comment.
Thanks for the quick response. That sounds much cleaner and more future proof, will do!
distributed/scheduler.py
Outdated
| if self.tasks[key].processing_on is not None: | ||
| kwargs['worker'] = self.tasks[key].processing_on.address | ||
| plugin.transition(key, start, finish2, *args, **kwargs) | ||
| plugin.transition(key, start, finish2, task_state=ts, *args, **kwargs) |
There was a problem hiding this comment.
I suspect that Python will yell at you here for placing *args after a named keyword argument. You'll probably have to insert this into kwargs yourself.
There was a problem hiding this comment.
Good point, wasn't giving errors but looking at it now args was always empty so probably why. Sorry, github didn't auto refresh the second post.
distributed/scheduler.py
Outdated
| self.tasks[ts.key] = ts | ||
| for plugin in list(self.plugins): | ||
| try: | ||
| kwargs['task_state'] = ts |
There was a problem hiding this comment.
Can we move this outside of the for loop?
There was a problem hiding this comment.
Of course! thanks for the feedback, sorry been away over holiday weekend.
|
It seems the test failures are unrelated, no? Happy to make updates to them if I broke something. |
|
It looks like some of the failures are genuine: @gen_cluster(client=True)
def test_eventstream_remote(c, s, a, b):
base_plugins = len(s.plugins)
comm = yield eventstream(s.address, interval=0.010)
start = time()
while len(s.plugins) == base_plugins:
yield gen.sleep(0.01)
assert time() < start + 5
futures = c.map(div, [1] * 10, range(10))
start = time()
total = []
while len(total) < 10:
msgs = yield comm.read()
> assert isinstance(msgs, tuple)
E assert False
E + where False = isinstance({'exception': TypeError("can not serialize 'TaskState' object"), 'status': 'uncaught-error', 'text': "can not serialize 'TaskState' object", 'traceback': <traceback object at 0x7fe417971d08>}, tuple)It looks like some existing plugins that we rely on send all of the information we give to them across a wire (I suspect that this one is for the progress bars). It looks like sending Looking at the state for TaskState objects it becomes clear that they include references to pretty much the entire graph, so probably the solution is to not send these things around. If EventStream is the only problem, then you can probably fix that here: distributed/distributed/diagnostics/eventstream.py Lines 24 to 28 in a3d2016 We definitely don't want to include all of kwargs. Probably we just want to exclude the |
|
Also, regarding linting, we started using the |
|
Okay so seems like I need to get the tests running locally to do this effectively. Got dask to test with only 5 errors but distributed is failing on test_simple and pretty much all and is running very slowly. Hopefully will have time to look into later this week |
|
You can also run |
|
When looking through the other plugins I found this: Just going to use that! Added a bit to the docs if you think it will be useful otherwise can be closed. |
|
Thanks @nicolls1! |
* upstream/master: (58 commits) Add unknown pytest markers (dask#2764) Delay lookup of allowed failures. (dask#2761) Change address -> worker in ColumnDataSource for nbytes plot (dask#2755) Remove module state in Prometheus Handlers (dask#2760) Add stress test for UCX (dask#2759) Add nanny logs (dask#2744) Move some of the adaptive logic into the scheduler (dask#2735) Add SpecCluster.new_worker_spec method (dask#2751) Worker dashboard fixes (dask#2747) Add async context managers to scheduler/worker classes (dask#2745) Fix the resource key representation before sending graphs (dask#2716) (dask#2733) Allow user to configure whether workers are daemon. (dask#2739) Pin pytest >=4 with pip in appveyor and python 3.5 (dask#2737) Add Experimental UCX Comm (dask#2591) Close nannies gracefully (dask#2731) add kwargs to progressbars (dask#2638) Add back LocalCluster.__repr__. (dask#2732) Move bokeh module to dashboard (dask#2724) Close clusters at exit (dask#2730) Add SchedulerPlugin TaskState example (dask#2622) ...
* upstream/master: (43 commits) Add unknown pytest markers (dask#2764) Delay lookup of allowed failures. (dask#2761) Change address -> worker in ColumnDataSource for nbytes plot (dask#2755) Remove module state in Prometheus Handlers (dask#2760) Add stress test for UCX (dask#2759) Add nanny logs (dask#2744) Move some of the adaptive logic into the scheduler (dask#2735) Add SpecCluster.new_worker_spec method (dask#2751) Worker dashboard fixes (dask#2747) Add async context managers to scheduler/worker classes (dask#2745) Fix the resource key representation before sending graphs (dask#2716) (dask#2733) Allow user to configure whether workers are daemon. (dask#2739) Pin pytest >=4 with pip in appveyor and python 3.5 (dask#2737) Add Experimental UCX Comm (dask#2591) Close nannies gracefully (dask#2731) add kwargs to progressbars (dask#2638) Add back LocalCluster.__repr__. (dask#2732) Move bokeh module to dashboard (dask#2724) Close clusters at exit (dask#2730) Add SchedulerPlugin TaskState example (dask#2622) ...
I would like to have worker info when a task transitions to processing in my scheduler plugin.
Currently the processing transition doesn't have any additional info:
I want to track which nodes are occupied and have current tasks running for tracking/auto scaling. I have long running tasks that happen very infrequently and requires beefy cloud GPU machines and can bring them up only when required.
Assuming this is an idea that is willing to be entertained, I looked at possibilities and this is the one that stood out to me. It is consistent with the kwargs that is passed when a task is finished.
At the moment, not seeing other info that would be useful to have
Happy to do it another way also, thought this would be the best way to start a conversation. Thanks in advance!