Add support for "grouped" adaptive scaling and adaptive behavior overrides.#1632
Add support for "grouped" adaptive scaling and adaptive behavior overrides.#1632mrocklin merged 3 commits intodask:masterfrom
Conversation
|
Given the current updates being made to the scheduler, I will hold off on merging |
|
It seems to me that this PR has two separable parts.
I'm entirely in favor of part 1. In general creating escape hatches so that others can implement custom logic outside of Dask is almost always good. I'm hesitant about part 2. At first glance this seems like logic for a specific case entering the mainline codebase. I would want to see this show up in a few different use cases before it became part of the main code. That being said, you've done a nice job of making this change minimal, and it doesn't seem to conflict with the existing flow of that operation. I'm not saying no here, just expressing some concern. |
|
I completely agree, this is logically separable and needs a little thought before merging. From my perspective (1) is a must-have, but I can work from there to developing a working implementation for my use case. On (2), I agree with you that this feels a bit too specialized to live in the scheduler. My options to begin were to (a) modify this function or (b) duplicate the implementation with modifications at a higher level. My gut instinct was that (b) was a ugly solution, but your assessment is spot on. I see the implementation options as:
My preference would be to go with (B) and lift this logic into the |
|
OK, short term I'm fine with (A) if you want this to go directly into the Scheduler, perhaps with a note in the docstring that it is experimental. It seems innocuous enough to add and may help others. However, given the flux around this part of the codebase, if you all are building long-term systems I do recommend pulling out some of your logic into custom Adaptive subclasses if you need stability. For the API of the grouping function itself we might just expect a new |
My turnaround preference for (a) rather than (b) is to avoid churn short term. Yes though I agree that we might consider deprecating or factoring workers_to_close out of the Scheduler. This can happen later though. |
|
I'm happy to be argued away from this approach though |
|
I'm moving information into WorkerState objects here: #1661 I suggest that after this is done we change the signature as follows: - group_key: Callable(worker_address, worker_info)
+ group_key: Callable(WorkerState) |
5bd3c8b to
a79e564
Compare
|
I've rebased, updated to use |
b021fbc to
bd60cee
Compare
Adds support for adaptive scaling over groups of "related" workers, as may occur when multiple workers are present on a single logical node. Modifies distributed.scheduler workers_to_close to perform scale-down analysis over worker groups using an user-provided grouping key, while defaulting to previous "per-worker" logic. Adds support for sub-class specific override of workers_to_close in `dask.adaptive.Adaptive` in order to allow hooks specifying environment specific grouping keys and/or customize scale down and scale up logic. Add baseline test of grouped workers_to_close logic and baseline test of adaptive behavior overrides.
bbd4ee1 to
4479889
Compare
|
I plan to merge this soon if there are no further comments. |
|
I've had a chance to work with the implementation in a real deployment and now agree with you that this implementation is likely the easiest solution for the time being. I've had to make significant changes to the adaptive layer for my deployment and don't see any reason conflate this feature with those issues. @mrocklin Don't worry about the force-push; I'll be able to continue working from my original commits for the time being and move to this implementation on the next release. Is there an expected timeline for the next release? |
|
It sounds like I'm starting down the same path. My current work is here: pangeo-data/pangeo#56 I'd be curious to hear about any additional issues that you run across. My apologies for the slow uptake on our end recently.
My current guess is in 1-2 weeks. |
|
No problems on uptake, I've definitely been off-and-on for these pulls as well. What would be the easiest way for me to document this for you? I can lift the dask-related work out of a private repo within a few days, which may be a good starting point for discussion. I'm still ramping-up on dask and understanding how I can stabily integrate this into our current software stack, but I've a reasonably stable autoscaled deployment running on compute engine. |
This would be above and beyond, so I'd be happy to accept anything. My ideal form would be a public-facing blogpost. Personal e-mail or whatnot would be fine too. An open-and-shut github issue is probably somewhere in between.
This seems to be a common theme these days. This would be, as far as I'm aware, the fourth such dask-on-cloud system I've seen in the last couple months. Here are some related issues:
This Kubernetes deployment seems to be the furthest along: https://github.com/cedadev/jasmin-dask/blob/master/docker/k8s-adaptive.py cc @mkjpryor-stfc |
This is a vaguely strawman proposal. I'm certainly happy to hear implementation feedback from maintainers, and admit that this conflicts with the open PR #1618, but believe these changes may offer a minimally invasive approach to support custom adaptive logic without further changes to score scheduler class.
This pull intended to improve support for adaptive scaling on clusters in which there is not a 1-to-1 mapping between workers and underlying cluster resources (a "node"). This may occur in which a resource restriction, such as node startup time, fixed node resource cost or node count limitation, require that multiple workers are hosted on a single node. In these contexts adaptive scaling should logically occur on a per-node basis, rather than a per worker basis, as closing a single worker on a node does not allow removal of the underlying cluster resource.
This patch attempts to provide the minimal required extension of the core scheduler "workers_to_close" logic required to perform scale-down analysis in a grouped context. Worker activity and memory use are considered on a grouped basis, and groups are selected on an all-or-none basis for retirement. Identification of a worker "group" is deferred to a user-specified grouping function, as the desired behavior is challenging to anticipate. A simple example of hostname-based grouping is provided in documentation.
Extends the
Adaptivebase class with an overridable hook forworkers_to_close, allowing optional specification of a grouping key as well as post-facto analysis of the candidate retirement list. This hook may be used to provide support for minimum cluster size logic, which is demonstrated via a test. This implementation may be relevant for #1618.