Skip to content

Add support for "grouped" adaptive scaling and adaptive behavior overrides.#1632

Merged
mrocklin merged 3 commits intodask:masterfrom
asford:grouped_adaptive
Dec 29, 2017
Merged

Add support for "grouped" adaptive scaling and adaptive behavior overrides.#1632
mrocklin merged 3 commits intodask:masterfrom
asford:grouped_adaptive

Conversation

@asford
Copy link
Copy Markdown
Contributor

@asford asford commented Dec 14, 2017

This is a vaguely strawman proposal. I'm certainly happy to hear implementation feedback from maintainers, and admit that this conflicts with the open PR #1618, but believe these changes may offer a minimally invasive approach to support custom adaptive logic without further changes to score scheduler class.

  • Determine if additional test coverage is required.
  • Maybe extended out-of-the-box implementation with "min size" and "max size" logic and hostname-based grouping via command-line flag.
  • Update documentation.

This pull intended to improve support for adaptive scaling on clusters in which there is not a 1-to-1 mapping between workers and underlying cluster resources (a "node"). This may occur in which a resource restriction, such as node startup time, fixed node resource cost or node count limitation, require that multiple workers are hosted on a single node. In these contexts adaptive scaling should logically occur on a per-node basis, rather than a per worker basis, as closing a single worker on a node does not allow removal of the underlying cluster resource.

This patch attempts to provide the minimal required extension of the core scheduler "workers_to_close" logic required to perform scale-down analysis in a grouped context. Worker activity and memory use are considered on a grouped basis, and groups are selected on an all-or-none basis for retirement. Identification of a worker "group" is deferred to a user-specified grouping function, as the desired behavior is challenging to anticipate. A simple example of hostname-based grouping is provided in documentation.

Extends the Adaptive base class with an overridable hook for workers_to_close, allowing optional specification of a grouping key as well as post-facto analysis of the candidate retirement list. This hook may be used to provide support for minimum cluster size logic, which is demonstrated via a test. This implementation may be relevant for #1618.

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Dec 14, 2017

Given the current updates being made to the scheduler, I will hold off on merging master into this pull until I've feedback and +1 from maintainers on the implementation. I will respond to implementation feedback working from the current release 1.20.2, so that I can continue testing from a stable base in my environment.

@mrocklin
Copy link
Copy Markdown
Member

It seems to me that this PR has two separable parts.

  1. Making the workers_to_close call in Adaptive configurable so that others can subclass Adaptive and provide their own logic
  2. Changing Scheduler.workers_to_close to support particular grouping logic

I'm entirely in favor of part 1. In general creating escape hatches so that others can implement custom logic outside of Dask is almost always good.

I'm hesitant about part 2. At first glance this seems like logic for a specific case entering the mainline codebase. I would want to see this show up in a few different use cases before it became part of the main code. That being said, you've done a nice job of making this change minimal, and it doesn't seem to conflict with the existing flow of that operation. I'm not saying no here, just expressing some concern.

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Dec 14, 2017

I completely agree, this is logically separable and needs a little thought before merging.

From my perspective (1) is a must-have, but I can work from there to developing a working implementation for my use case.

On (2), I agree with you that this feels a bit too specialized to live in the scheduler. My options to begin were to (a) modify this function or (b) duplicate the implementation with modifications at a higher level. My gut instinct was that (b) was a ugly solution, but your assessment is spot on. I see the implementation options as:

  • (A) Stay in scheduler. Non-ideal, as you don't want to add application-specific logic in the class, and there no reason to say that this feature should be privileged over any other.

  • (B) Lift to adaptive. This would provide the feature "out of the box" with distributed, but would require duplication of the basic workers_to_close logic. I think it would be reasonable to extend adaptive to provide generic, mutually compatible support for more scaling features and could envision further development occurring there.

  • (C) Extract into an adaptive subclass. I would prefer that the generic framing of this logic live somewhere within distributed so that I can target a reasonable API for my deployment specific plugin and provide testing for the feature within your CI. This might be a "incubate/contrib" level feature, implying that you wouldn't guarantee forward compatibility, but I don't know if you've a process in place to support that.

My preference would be to go with (B) and lift this logic into the Adaptive class. Would that be an acceptable solution? Would it then make sense to "feature freeze" Scheduler.workers_to_close and/or deprecate it?

@mrocklin
Copy link
Copy Markdown
Member

OK, short term I'm fine with (A) if you want this to go directly into the Scheduler, perhaps with a note in the docstring that it is experimental. It seems innocuous enough to add and may help others.

However, given the flux around this part of the codebase, if you all are building long-term systems I do recommend pulling out some of your logic into custom Adaptive subclasses if you need stability.

For the API of the grouping function itself we might just expect a new WorkerState object. Currently this doesn't include the worker_info, but this might change: #1641

@mrocklin
Copy link
Copy Markdown
Member

My preference would be to go with (B) and lift this logic into the Adaptive class. Would that be an acceptable solution? Would it then make sense to "feature freeze" Scheduler.workers_to_close and/or deprecate it?

My turnaround preference for (a) rather than (b) is to avoid churn short term. Yes though I agree that we might consider deprecating or factoring workers_to_close out of the Scheduler. This can happen later though.

@mrocklin
Copy link
Copy Markdown
Member

I'm happy to be argued away from this approach though

@mrocklin
Copy link
Copy Markdown
Member

I'm moving information into WorkerState objects here: #1661

I suggest that after this is done we change the signature as follows:

-        group_key: Callable(worker_address, worker_info)
+        group_key: Callable(WorkerState)

@mrocklin
Copy link
Copy Markdown
Member

I've rebased, updated to use WorkerState objects, and force pushed. @asford I apologize for abusing your fork here. I hope you don't mind. If you have a chance to review I would appreciate it.

@mrocklin mrocklin force-pushed the grouped_adaptive branch 2 times, most recently from b021fbc to bd60cee Compare December 28, 2017 01:51
Alex Ford and others added 3 commits December 27, 2017 20:08
Adds support for adaptive scaling over groups of "related" workers, as
may occur when multiple workers are present on a single logical node.
Modifies distributed.scheduler workers_to_close to perform scale-down
analysis over worker groups using an user-provided grouping key, while
defaulting to previous "per-worker" logic.

Adds support for sub-class specific override of workers_to_close in
`dask.adaptive.Adaptive` in order to allow hooks specifying environment
specific grouping keys and/or customize scale down and scale up logic.

Add baseline test of grouped workers_to_close logic and baseline test of
adaptive behavior overrides.
@mrocklin
Copy link
Copy Markdown
Member

I plan to merge this soon if there are no further comments.

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Dec 28, 2017

I've had a chance to work with the implementation in a real deployment and now agree with you that this implementation is likely the easiest solution for the time being. I've had to make significant changes to the adaptive layer for my deployment and don't see any reason conflate this feature with those issues.

@mrocklin Don't worry about the force-push; I'll be able to continue working from my original commits for the time being and move to this implementation on the next release. Is there an expected timeline for the next release?

@mrocklin
Copy link
Copy Markdown
Member

It sounds like I'm starting down the same path. My current work is here: pangeo-data/pangeo#56

I'd be curious to hear about any additional issues that you run across. My apologies for the slow uptake on our end recently.

Is there an expected timeline for the next release?

My current guess is in 1-2 weeks.

@asford
Copy link
Copy Markdown
Contributor Author

asford commented Dec 28, 2017

No problems on uptake, I've definitely been off-and-on for these pulls as well.

What would be the easiest way for me to document this for you? I can lift the dask-related work out of a private repo within a few days, which may be a good starting point for discussion. I'm still ramping-up on dask and understanding how I can stabily integrate this into our current software stack, but I've a reasonably stable autoscaled deployment running on compute engine.

@mrocklin
Copy link
Copy Markdown
Member

What would be the easiest way for me to document this for you?

This would be above and beyond, so I'd be happy to accept anything. My ideal form would be a public-facing blogpost. Personal e-mail or whatnot would be fine too. An open-and-shut github issue is probably somewhere in between.

but I've a reasonably stable autoscaled deployment running on compute engine.

This seems to be a common theme these days. This would be, as far as I'm aware, the fourth such dask-on-cloud system I've seen in the last couple months. Here are some related issues:

This Kubernetes deployment seems to be the furthest along: https://github.com/cedadev/jasmin-dask/blob/master/docker/k8s-adaptive.py cc @mkjpryor-stfc

@mrocklin mrocklin merged commit 61ed8a5 into dask:master Dec 29, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants