Skip to content

Add worker task processing on if present#2622

Merged
TomAugspurger merged 11 commits intodask:masterfrom
nicolls1:master
May 24, 2019
Merged

Add worker task processing on if present#2622
TomAugspurger merged 11 commits intodask:masterfrom
nicolls1:master

Conversation

@nicolls1
Copy link
Copy Markdown
Contributor

I would like to have worker info when a task transitions to processing in my scheduler plugin.
Currently the processing transition doesn't have any additional info:

transition: key=test-a0abb189-4c26-41df-bdcd-6176ace3b534, start=waiting, finish=processing, *args=(), **kwargs={}

I want to track which nodes are occupied and have current tasks running for tracking/auto scaling. I have long running tasks that happen very infrequently and requires beefy cloud GPU machines and can bring them up only when required.

Assuming this is an idea that is willing to be entertained, I looked at possibilities and this is the one that stood out to me. It is consistent with the kwargs that is passed when a task is finished.

At the moment, not seeing other info that would be useful to have

Happy to do it another way also, thought this would be the best way to start a conversation. Thanks in advance!

for plugin in list(self.plugins):
try:
if self.tasks[key].processing_on is not None:
kwargs['worker'] = self.tasks[key].processing_on.address
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nicolls1 !

Perhaps instead we might just place in the entire task state object, and let the plugin do what it wants with it. Thoughts?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, I'm suggesting something like kwargs['task_state'] = ts

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick response. That sounds much cleaner and more future proof, will do!

if self.tasks[key].processing_on is not None:
kwargs['worker'] = self.tasks[key].processing_on.address
plugin.transition(key, start, finish2, *args, **kwargs)
plugin.transition(key, start, finish2, task_state=ts, *args, **kwargs)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that Python will yell at you here for placing *args after a named keyword argument. You'll probably have to insert this into kwargs yourself.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, wasn't giving errors but looking at it now args was always empty so probably why. Sorry, github didn't auto refresh the second post.

self.tasks[ts.key] = ts
for plugin in list(self.plugins):
try:
kwargs['task_state'] = ts
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this outside of the for loop?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sorry for the nitpick)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gentle ping

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course! thanks for the feedback, sorry been away over holiday weekend.

@nicolls1
Copy link
Copy Markdown
Contributor Author

It seems the test failures are unrelated, no? Happy to make updates to them if I broke something.

@mrocklin
Copy link
Copy Markdown
Member

It looks like some of the failures are genuine:

    @gen_cluster(client=True)
    def test_eventstream_remote(c, s, a, b):
        base_plugins = len(s.plugins)
        comm = yield eventstream(s.address, interval=0.010)
    
        start = time()
        while len(s.plugins) == base_plugins:
            yield gen.sleep(0.01)
            assert time() < start + 5
    
        futures = c.map(div, [1] * 10, range(10))
    
        start = time()
        total = []
        while len(total) < 10:
            msgs = yield comm.read()
>           assert isinstance(msgs, tuple)
E           assert False
E            +  where False = isinstance({'exception': TypeError("can not serialize 'TaskState' object"), 'status': 'uncaught-error', 'text': "can not serialize 'TaskState' object", 'traceback': <traceback object at 0x7fe417971d08>}, tuple)

It looks like some existing plugins that we rely on send all of the information we give to them across a wire (I suspect that this one is for the progress bars). It looks like sending TaskState objects is difficult, probably because they include some non-serializable object.

Looking at the state for TaskState objects it becomes clear that they include references to pretty much the entire graph, so probably the solution is to not send these things around. If EventStream is the only problem, then you can probably fix that here:

def transition(self, key, start, finish, *args, **kwargs):
if start == "processing":
kwargs["key"] = key
if finish == "memory" or finish == "erred":
self.buffer.append(kwargs)

We definitely don't want to include all of kwargs. Probably we just want to exclude the ts keyword.

@mrocklin
Copy link
Copy Markdown
Member

Also, regarding linting, we started using the black code formatter. You might want to run the following:

pip install black
black distributed
git commit -a -m "black linting"

@nicolls1
Copy link
Copy Markdown
Contributor Author

Okay so seems like I need to get the tests running locally to do this effectively. Got dask to test with only 5 errors but distributed is failing on test_simple and pretty much all and is running very slowly. Hopefully will have time to look into later this week

@mrocklin
Copy link
Copy Markdown
Member

You can also run

py.test distributed -k test_eventstream_remote

@nicolls1
Copy link
Copy Markdown
Contributor Author

When looking through the other plugins I found this:
https://github.com/dask/distributed/blob/master/distributed/stealing.py#L77

Just going to use that! Added a bit to the docs if you think it will be useful otherwise can be closed.

@TomAugspurger TomAugspurger merged commit 6339d81 into dask:master May 24, 2019
@TomAugspurger
Copy link
Copy Markdown
Member

Thanks @nicolls1!

calebho pushed a commit to calebho/distributed that referenced this pull request May 29, 2019
muammar added a commit to muammar/distributed that referenced this pull request Jun 12, 2019
* upstream/master: (58 commits)
  Add unknown pytest markers (dask#2764)
  Delay lookup of allowed failures. (dask#2761)
  Change address -> worker in ColumnDataSource for nbytes plot (dask#2755)
  Remove module state in Prometheus Handlers (dask#2760)
  Add stress test for UCX (dask#2759)
  Add nanny logs (dask#2744)
  Move some of the adaptive logic into the scheduler (dask#2735)
  Add SpecCluster.new_worker_spec method (dask#2751)
  Worker dashboard fixes (dask#2747)
  Add async context managers to scheduler/worker classes (dask#2745)
  Fix the resource key representation before sending graphs (dask#2716) (dask#2733)
  Allow user to configure whether workers are daemon. (dask#2739)
  Pin pytest >=4 with pip in appveyor and python 3.5 (dask#2737)
  Add Experimental UCX Comm (dask#2591)
  Close nannies gracefully (dask#2731)
  add kwargs to progressbars (dask#2638)
  Add back LocalCluster.__repr__. (dask#2732)
  Move bokeh module to dashboard (dask#2724)
  Close clusters at exit (dask#2730)
  Add SchedulerPlugin TaskState example (dask#2622)
  ...
muammar added a commit to muammar/distributed that referenced this pull request Jul 18, 2019
* upstream/master: (43 commits)
  Add unknown pytest markers (dask#2764)
  Delay lookup of allowed failures. (dask#2761)
  Change address -> worker in ColumnDataSource for nbytes plot (dask#2755)
  Remove module state in Prometheus Handlers (dask#2760)
  Add stress test for UCX (dask#2759)
  Add nanny logs (dask#2744)
  Move some of the adaptive logic into the scheduler (dask#2735)
  Add SpecCluster.new_worker_spec method (dask#2751)
  Worker dashboard fixes (dask#2747)
  Add async context managers to scheduler/worker classes (dask#2745)
  Fix the resource key representation before sending graphs (dask#2716) (dask#2733)
  Allow user to configure whether workers are daemon. (dask#2739)
  Pin pytest >=4 with pip in appveyor and python 3.5 (dask#2737)
  Add Experimental UCX Comm (dask#2591)
  Close nannies gracefully (dask#2731)
  add kwargs to progressbars (dask#2638)
  Add back LocalCluster.__repr__. (dask#2732)
  Move bokeh module to dashboard (dask#2724)
  Close clusters at exit (dask#2730)
  Add SchedulerPlugin TaskState example (dask#2622)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants