-
-
Notifications
You must be signed in to change notification settings - Fork 756
Description
(Some context of this is in #2602)
Summary
Workers should start taking memory generation into local scheduling policies. This affects both task prioritization, and pausing.
Background on Prioritization
Currently task prioritization is based on four factors, in order
- User-defined priority, such as defined by the
with dask.annotate(priority=...)context manager - Client submission time (first in, first out) for different computations (scheduler generation)
- Graph structure, using dask.order
- The time at which a task appeared on the worker (worker generation)
In common practice, 1 and 2 don't fluctuate much (users rarely use priorities and most comparative decisions are between tasks in the same .compute call, and so 3/graph structure is the most prominent. 4 is only used today when the user makes lots of submit calls without graph structure.
However sometimes we learn more about the computation at runtime. One critical piece of information that we learn is the size and type of each result. This can tell us useful information like "read_parquet" tasks tend to allocate more memory than they free, which, assuming that we're in a memory constrained environment (common) can be more important than graph structure.
Graph ordering is usually great. Some times it isn't. Mostly this is because of information about memory. When should we override graph ordering prioritization?
Background on pausing
Workers sometimes choose to pause execution. This mostly happens when they are out of memory, and hope that some other process in the cluster is going to save them. This might be a few different causes:
- The cluster is planning to scale up
- Other workers are collecting data from this worker to free it pu
- Other tasks are running which will allow this worker to free some of its own tasks
But oftentimes pausing is also bad, and results in a deadlocked cluster. Sometimes we actually need this worker to run certain tasks so that we can get ourselves out of a jam. In these cases we don't so much want to pause work, as we want to pause all work that will lead to more medium-term memory use.
Background: current data collection
We currently track memory use on a per-TaskState basis on both the scheduler and worker (both have a TaskState class) and on a per TaskGroup basis on the scheduler.
Some decisions we have to make
So, we should probably look at storing more aggregate information (aggregations are good for generalizing and making scheduling decisions). We should also look both at the use of any particular group/prefix but also the consumption of any particular group/prefix.
Scheduler/Worker
Additionally, we also need to determine if we want to make this decision on the scheduler or worker side.
Pro-scheduler:
- The scheduler is more able to generalize
- We get to see nice diagnostics on a dashboard
- The scheduler actually has dependency information, and so can handle more complex graph situations where production may not be bad or consumption may not be good. The scheduler can implement more global metrics.
But then the information is farther away from where the decision is implemented. This can be hard for two reasons:
Pro-worker:
- Sometimes these decisions are time-sensitive
- It's complicated to coordiante
When do we override graph structure priorities?
Do we want to always do this? Only when we find tasks that are particularly memory consuming/producing? Only when we're low on memory?
Proposal: prioritization
To start things off, I'll suggest a relatively simple approach.
We add a worker.py:TaskPrefix class that looks like the following:
class TaskPrefix:
def __init__(self, key):
self.key = key
self.bytes_consumed = 0
self.bytes_produced = 0
self.n_processed = 0It tracks total bytes of all inputs and total bytes of all outputs. Then, when computing worker priorities we do the following:
def add_task(self, key, ..., priority=...):
...
tp = self.prefixes[key_split(key)]
user, scheduler_generation, graph = priority
if tp.bytes_produced < 5 * tp.bytes_consumed:
memory = -1
elif tp.bytes_produced > 5 * tp.bytes_consumed:
memory = 1
priority = (user, scheduler_generation, memory, graph, self.generation)If the prefix associated to a new task produces/consumes five times as much data as it consumes/produces then we override the graph prioritity. Otherwise this level of the prioritization stays at zero, and we defer to subsequent prioritizations. This strikes a balance between believe in graph prioritization and taking control with memory prioritization. Most of the cases where this is an issue production is high and consumption is zero (like read_parquet) or consumption is high and production is orders of magnitude lower (like x.sum())
Unfortunately for rootish tasks we'll have lots of tasks assigned at once before we know their memory priority. To resolve this we could check the priority as we pop things off of the ready heap. If the priority coming off of the heap is obviously stale then we could re-prioritize the entire heap. This would happen infrequently and is pretty cheap to do anyway, assuming we have less than millions of tasks per worker.
Proposal: pausing
Currently we pause with a periodic callback once memory use gets to something like 80%. I propose that we also pause when pulling a task off of the ready stack if that task is memory producing and our memory is something like 50%. This should be more responsive. We'll want to unpause immediately if we get a non-memory-producing task.