Revive #2740: Allow tasks with restrictions to be stolen#3069
Revive #2740: Allow tasks with restrictions to be stolen#3069mrocklin merged 12 commits intodask:masterfrom
Conversation
Addresses stealing tasks with resource restrictions, as mentioned in dask#1851. If a task has hard restrictions, do not just give up on stealing. Instead, use the restrictions to determine which workers can steal it before attempting to execute a steal operation. A follow up PR will be needed to address the issue of long-running tasks not being stolen because the scheduler has no information about their runtime.
Co-Authored-By: Matthew Rocklin <mrocklin@gmail.com>
|
Ping original author @calebho, and original reviewers @mrocklin, @martindurant, @TomAugspurger. |
|
👍 just to say that this would be wonderfull if this issue could be solve, this has been awaited for several month in our HPC center! |
TomAugspurger
left a comment
There was a problem hiding this comment.
Overall, this looks good. I think the concern last time was slowing performance of the no-restriction case. I can look into writing a benchmark for these.
distributed/stealing.py
Outdated
| return _has_resources(thief, victim.resources) | ||
|
|
||
|
|
||
| def _has_resources(ws, required_resources): |
There was a problem hiding this comment.
This could likely be inlined in _can_steal
There was a problem hiding this comment.
ok, I've inlined the function
|
Thanks for reviving this. I haven't had the bandwidth to continue pushing for this so please feel free to take over from here. Happy to provide review input |
distributed/stealing.py
Outdated
| ``victim``. | ||
| """ | ||
| if not _has_restrictions(ts): | ||
| return True |
There was a problem hiding this comment.
This check makes sense to have semantically, but it's also unnecessary given where _can_steal is called.
| """ | ||
| return not ts.loose_restrictions and ( | ||
| ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions | ||
| ) |
There was a problem hiding this comment.
I wonder if maybe we can cache this on the TaskState object itself in order to avoid recomputation.
if ts.restricted:
...There was a problem hiding this comment.
It seems that sometimes these are updated
# in scheduler.py L1707
ts = self.tasks[k]
ts.loose_restrictions = TrueIf that's the case, I'd prefer to not cache, so that we don't have to worry about invalidating the cache (though perhaps this is the only case?).
There was a problem hiding this comment.
That's in update_graph which is the time when we construct most tasks. My guess is that that this isn't an issue. We won't check ts.restricted before calling those lines.
|
In general this seems fine. I am concerned about performance. Anecdotally I'll say that work stealing accounts for something like 20-30% of scheduler time when under heavy load. At some point I went through and micro-optimized things a bit. It would be good to be mindful of costs here. The scheduler is definitely a bottleneck in lots of larger important user workflows (Pangeo cares about this a bunch) so we need to be performance conscious. I think that we can achieve that fairly easily here though with modest work. |
|
@seibert checking in. Is this still something that you're likely to pursue? |
|
Yes, I had to switch to doing some conference prep / speaking this week, but I am still interested in solving this issue, as it reduces throughput in the Dask-based testing system we have. I'll need some guidance how to benchmark the impact of any changes, though. |
Unfortunately we don't have good benchmarks for the Dask scheduler (or any part of Dask really) (although this would be great work for someone in the future). I've made a few concrete suggestions above that I think might be helpful. Also, if anything pop's out at you it might be good to bring it up (I trust your performance intuition over pretty much anyone's). |
benchmarks the improvement on dask/distributed#3069
|
Added a benchmark for the improvement in dask/dask-benchmarks#22. Things look good for the new one. Things are being stolen by the new worker with the resource. I think we'll want to add a test using resources here. Right now it's just testing worker restrictions. I'll do that tomorrow. Finally, I'll need to verify that we haven't regressed on performance for the common case of no restrictions. I think we have a benchmark for that, but will verify. cc @mattilyra. |
TomAugspurger
left a comment
There was a problem hiding this comment.
I think we'll want to add a test using resources here. Right now it's just testing worker restrictions. I'll do that tomorrow.
We apparently already have a tests for that. test_steal_resource_restrictions.
verify that we haven't regressed on performance for the common case of no restrictions
Things seem OK over in the dask benchmarks repo.
# master
[ 75.00%] ··· client.ClientSuite.time_trivial_tasks 220±5ms
[100.00%] ··· client.WorkerRestrictionsSuite.time_trivial_tasks_restrictions 1.06±0s
# this PR
[ 75.00%] ··· client.ClientSuite.time_trivial_tasks 224±6ms
[100.00%] ··· client.WorkerRestrictionsSuite.time_trivial_tasks_restrictions 629±20ms
| """ | ||
| return not ts.loose_restrictions and ( | ||
| ts.host_restrictions or ts.worker_restrictions or ts.resource_restrictions | ||
| ) |
There was a problem hiding this comment.
It seems that sometimes these are updated
# in scheduler.py L1707
ts = self.tasks[k]
ts.loose_restrictions = TrueIf that's the case, I'd prefer to not cache, so that we don't have to worry about invalidating the cache (though perhaps this is the only case?).
I added a few comments there. I don't think that that benchmark is likely to be sensitive to the performance impacts here. I would recommend cranking the stealing interval up super high. |
|
To be more explicit, lets say that we're aiming for a 200us overhead per task. How much are we willing to increase that overhead to support stealing restricted tasks? 10us? 20us? This isn't a very common case, so I think that even those numbers might be fairly high. This isn't a long term solution, but I encourage people to run a workload locally on your laptop and then look through the |
|
I just did this myself to verify my previous experience. I was surprised to learn that we seem to be spending almost all of our time sending and receiving things from sockets that are supposed to be non-blocking. This is probably something that we should look into in the near future. |
|
I would really benefit from this PR being merged - unless there is a work-around...? My workload consists of short running (5-20s) GPU tasks, followed by longer running (2-10 minutes) CPU tasks. I set resource CPU or GPU on my workers to make sure tasks are run on the right hosts. I launch the GPU tasks (.submit), followed by the CPU tasks. If I e.g. restart (or add) a CPU-worker, that worker will not steal any of the tasks from the other workers. |
For now, a workaround is to disable work stealing, and add a worker plugin with the changes from this PR. But we're hoping to get this merged in the next day or so. Just need to verify a few performance things. |
TomAugspurger
left a comment
There was a problem hiding this comment.
@leej3 did some nice work expanding our benchmarks in this area in dask/dask-benchmarks#35. From the timings there, we have
On master:
========== ========= ========= ========= =========
-- steal_interval
---------- ---------------------------------------
resource 0.01 0.1 1 100
========== ========= ========= ========= =========
1 1.08±0s 1.06±0s 1.07±0s 1.07±0s
None 436±0ms 430±0ms 436±0ms 436±0ms
========== ========= ========= ========= =========
On #3069:
========== ========= ========= ========= =========
-- steal_interval
---------- ---------------------------------------
resource 0.01 0.1 1 100
========== ========= ========= ========= =========
1 625±0ms 542±0ms 547±0ms 644±0ms
None 428±0ms 425±0ms 431±0ms 438±0ms
========== ========= ========= ========= =========
I'm reasonably confident that this benchmark is hitting the relevant code (proved by the with-resource restriction case being faster, since we have stealing). And it looks like it isn't slowing down the common case of no resource restrictions.
|
@mrocklin do the benchmarks above sufficiently satisfy your concerns about performance? |
Not entirely. We're still iterating through every worker for every stealable task. I think that this will get more interesting when this starts being run in production. But, I'm at least satisfied that it doesn't seem to be slowing down the common case, and folks seem pretty excited about trying this out. So let's merge and see what happens. |
|
Thanks a lot everyone for getting this in, will try this release ASAP |
I recently stumbled over an issue that is fixed by #2740, but noticed that review on that PR seems to have stalled because a bunch of extraneous commits were accidentally merged, and the original author hasn't opened a new PR or cleaned up the existing one.
This PR attempts to recreate that PR by cherry-picking those commits (so the original author still gets credit), and adds a minor fix. If there are additional changes needed to finish out code review on this, I'm happy to do those.