Skip to content

Revive #2740: Allow tasks with restrictions to be stolen#3069

Merged
mrocklin merged 12 commits intodask:masterfrom
seibert:revive_2740
Mar 3, 2020
Merged

Revive #2740: Allow tasks with restrictions to be stolen#3069
mrocklin merged 12 commits intodask:masterfrom
seibert:revive_2740

Conversation

@seibert
Copy link
Copy Markdown
Contributor

@seibert seibert commented Sep 19, 2019

I recently stumbled over an issue that is fixed by #2740, but noticed that review on that PR seems to have stalled because a bunch of extraneous commits were accidentally merged, and the original author hasn't opened a new PR or cleaned up the existing one.

This PR attempts to recreate that PR by cherry-picking those commits (so the original author still gets credit), and adds a minor fix. If there are additional changes needed to finish out code review on this, I'm happy to do those.

calebho and others added 6 commits September 19, 2019 13:02
Addresses stealing tasks with resource restrictions, as mentioned in dask#1851.
If a task has hard restrictions, do not just give up on stealing.
Instead, use the restrictions to determine which workers can steal it
before attempting to execute a steal operation.

A follow up PR will be needed to address the issue of long-running tasks
not being stolen because the scheduler has no information about their
runtime.
Co-Authored-By: Matthew Rocklin <mrocklin@gmail.com>
@seibert
Copy link
Copy Markdown
Contributor Author

seibert commented Sep 19, 2019

Ping original author @calebho, and original reviewers @mrocklin, @martindurant, @TomAugspurger.

@guillaumeeb
Copy link
Copy Markdown
Member

👍 just to say that this would be wonderfull if this issue could be solve, this has been awaited for several month in our HPC center!

Copy link
Copy Markdown
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks good. I think the concern last time was slowing performance of the no-restriction case. I can look into writing a benchmark for these.

return _has_resources(thief, victim.resources)


def _has_resources(ws, required_resources):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could likely be inlined in _can_steal

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I've inlined the function

@calebho
Copy link
Copy Markdown
Contributor

calebho commented Sep 19, 2019

Thanks for reviving this. I haven't had the bandwidth to continue pushing for this so please feel free to take over from here. Happy to provide review input

``victim``.
"""
if not _has_restrictions(ts):
return True
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check makes sense to have semantically, but it's also unnecessary given where _can_steal is called.

"""
return not ts.loose_restrictions and (
ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if maybe we can cache this on the TaskState object itself in order to avoid recomputation.

if ts.restricted:
    ...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that sometimes these are updated

# in scheduler.py L1707
                    ts = self.tasks[k]
                    ts.loose_restrictions = True

If that's the case, I'd prefer to not cache, so that we don't have to worry about invalidating the cache (though perhaps this is the only case?).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's in update_graph which is the time when we construct most tasks. My guess is that that this isn't an issue. We won't check ts.restricted before calling those lines.

@mrocklin
Copy link
Copy Markdown
Member

In general this seems fine. I am concerned about performance. Anecdotally I'll say that work stealing accounts for something like 20-30% of scheduler time when under heavy load. At some point I went through and micro-optimized things a bit. It would be good to be mindful of costs here.

The scheduler is definitely a bottleneck in lots of larger important user workflows (Pangeo cares about this a bunch) so we need to be performance conscious. I think that we can achieve that fairly easily here though with modest work.

@mrocklin
Copy link
Copy Markdown
Member

@seibert checking in. Is this still something that you're likely to pursue?

@seibert
Copy link
Copy Markdown
Contributor Author

seibert commented Sep 26, 2019

Yes, I had to switch to doing some conference prep / speaking this week, but I am still interested in solving this issue, as it reduces throughput in the Dask-based testing system we have. I'll need some guidance how to benchmark the impact of any changes, though.

@mrocklin
Copy link
Copy Markdown
Member

I'll need some guidance how to benchmark the impact of any changes, though.

Unfortunately we don't have good benchmarks for the Dask scheduler (or any part of Dask really) (although this would be great work for someone in the future).

I've made a few concrete suggestions above that I think might be helpful. Also, if anything pop's out at you it might be good to bring it up (I trust your performance intuition over pretty much anyone's).

TomAugspurger added a commit to TomAugspurger/dask-benchmarks that referenced this pull request Nov 6, 2019
@TomAugspurger
Copy link
Copy Markdown
Member

TomAugspurger commented Nov 6, 2019

Added a benchmark for the improvement in dask/dask-benchmarks#22. Things look good for the new one. Things are being stolen by the new worker with the resource.

I think we'll want to add a test using resources here. Right now it's just testing worker restrictions. I'll do that tomorrow.

Finally, I'll need to verify that we haven't regressed on performance for the common case of no restrictions. I think we have a benchmark for that, but will verify.

cc @mattilyra.

Copy link
Copy Markdown
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll want to add a test using resources here. Right now it's just testing worker restrictions. I'll do that tomorrow.

We apparently already have a tests for that. test_steal_resource_restrictions.

verify that we haven't regressed on performance for the common case of no restrictions

Things seem OK over in the dask benchmarks repo.

# master
[ 75.00%] ··· client.ClientSuite.time_trivial_tasks                                                                                                                          220±5ms
[100.00%] ··· client.WorkerRestrictionsSuite.time_trivial_tasks_restrictions                                                                                                 1.06±0s


# this PR
[ 75.00%] ··· client.ClientSuite.time_trivial_tasks                                                                                                                          224±6ms
[100.00%] ··· client.WorkerRestrictionsSuite.time_trivial_tasks_restrictions                                                                                                629±20ms

"""
return not ts.loose_restrictions and (
ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that sometimes these are updated

# in scheduler.py L1707
                    ts = self.tasks[k]
                    ts.loose_restrictions = True

If that's the case, I'd prefer to not cache, so that we don't have to worry about invalidating the cache (though perhaps this is the only case?).

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Nov 7, 2019

Things seem OK over in the dask benchmarks repo.

I added a few comments there. I don't think that that benchmark is likely to be sensitive to the performance impacts here. I would recommend cranking the stealing interval up super high.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Nov 7, 2019

To be more explicit, lets say that we're aiming for a 200us overhead per task. How much are we willing to increase that overhead to support stealing restricted tasks? 10us? 20us? This isn't a very common case, so I think that even those numbers might be fairly high.

This isn't a long term solution, but I encourage people to run a workload locally on your laptop and then look through the /profile-server route on the scheduler's dashboard. You'll find stealing there as taking up 10-15% of the time (or at least this was the case last time I checked, about a year ago). I recommend diving through this just to get a sense of what things cost today.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Nov 7, 2019

I just did this myself to verify my previous experience. I was surprised to learn that we seem to be spending almost all of our time sending and receiving things from sockets that are supposed to be non-blocking. This is probably something that we should look into in the near future.

@mberglundmx
Copy link
Copy Markdown

I would really benefit from this PR being merged - unless there is a work-around...?

My workload consists of short running (5-20s) GPU tasks, followed by longer running (2-10 minutes) CPU tasks. I set resource CPU or GPU on my workers to make sure tasks are run on the right hosts.

I launch the GPU tasks (.submit), followed by the CPU tasks. If I e.g. restart (or add) a CPU-worker, that worker will not steal any of the tasks from the other workers.

@TomAugspurger
Copy link
Copy Markdown
Member

unless there is a work-around...?

For now, a workaround is to disable work stealing, and add a worker plugin with the changes from this PR.

But we're hoping to get this merged in the next day or so. Just need to verify a few performance things.

Copy link
Copy Markdown
Member

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leej3 did some nice work expanding our benchmarks in this area in dask/dask-benchmarks#35. From the timings there, we have

On master:

========== ========= ========= ========= =========
--                      steal_interval
---------- ---------------------------------------
resource     0.01      0.1        1        100
========== ========= ========= ========= =========
  1        1.08±0s   1.06±0s   1.07±0s   1.07±0s
 None      436±0ms   430±0ms   436±0ms   436±0ms
========== ========= ========= ========= =========

On #3069:

========== ========= ========= ========= =========
--                      steal_interval
---------- ---------------------------------------
resource     0.01      0.1        1        100
========== ========= ========= ========= =========
  1        625±0ms   542±0ms   547±0ms   644±0ms
 None      428±0ms   425±0ms   431±0ms   438±0ms
========== ========= ========= ========= =========

I'm reasonably confident that this benchmark is hitting the relevant code (proved by the with-resource restriction case being faster, since we have stealing). And it looks like it isn't slowing down the common case of no resource restrictions.

@TomAugspurger
Copy link
Copy Markdown
Member

@mrocklin do the benchmarks above sufficiently satisfy your concerns about performance?

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Mar 3, 2020

@mrocklin do the benchmarks above sufficiently satisfy your concerns about performance?

Not entirely. We're still iterating through every worker for every stealable task. I think that this will get more interesting when this starts being run in production. But, I'm at least satisfied that it doesn't seem to be slowing down the common case, and folks seem pretty excited about trying this out. So let's merge and see what happens.

@mrocklin mrocklin merged commit d8d0d4e into dask:master Mar 3, 2020
@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Mar 3, 2020

Thank you @seibert for doing this work, and to @leej3 and friends for benchmarking things.

@mberglundmx
Copy link
Copy Markdown

Thanks a lot everyone for getting this in, will try this release ASAP

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants