-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
dask.order over-prioritizes root tasks in some situations #9995
Description
In test_anom_mean—an example of a basic climatology workload that kicked off long discussion about memory overproduction—it seems that an underlying problem might actually be with how dask.order prioritizes the graph.
Compare how the scheduler traverses the graph with queuing off and co-assignment off (i.e. scheduler behavior when the issue was first reported, prior to dask/distributed#4967 and dask/distributed#6614), versus with queuing on as normal:
Videos of the graphs executing
Queuing on as normal:
anom-mean-queue-normal.mp4
Queuing off and co-assignment off:
anom-mean-no-queue-no-coassign.mp4
Basically, the graph is prioritized such that the lowest-priority data loading task is still higher priority than nearly all of the data-consuming tasks.
Here's a mini version of the graph, with priorities:
Code to reproduce
import dask.array as da
import numpy as np
import xarray as xr
from dask.utils import parse_bytes
data = da.random.random(
(26, 1310720),
chunks=(1, parse_bytes("10MiB") // 8),
)
ngroups = data.shape[0] // 4
arr = xr.DataArray(
data,
dims=["time", "x"],
coords={"day": ("time", np.arange(data.shape[0]) % ngroups)},
)
clim = arr.groupby("day").mean(dim="time")
anom = arr.groupby("day") - clim
anom_mean = anom.mean(dim="time")
anom_mean.data.visualize(
"anom-mean-order.png", color="order", optimize_graph=True, collapse_outputs=True
)The last data-loading task is priority 106 (lower-left, second circle up). Looking at the data-reducing tasks, I only count 7 / 26 of them as being < 106. So if we follow graph order exactly, ~75% of the initial data will stay in memory.
So before queueing, workers had both root tasks and reduction tasks assigned. They could have run the reduction tasks—all the dependencies were in memory—they just chose not to, because they were lower priority.
To prove this, here are the tasks & priorities assigned to a worker with queuing off:
dashboard-priority.mp4
Queuing effectively added a new graph-prioritization mechanism saying "I don't care what your dask.order priority is, if you look like a root task, you always run last". It seems that for test_anom_mean, this re-prioritization of the graph might have been the main thing that solved memory usage, less the scheduler<->worker race conditions that queuing also addressed.
This is the same thing @fjetter said in dask/distributed#7526 (comment):
this lets me think that a major contributing factor to the success of queuing is in fact that we are breaking priority ordering in a very specific way
The dask.order code is quite complicated, so I don't know how we'd address this there. Maybe the is_rootish heuristic is good enough and it's not worth trying? It'd be nice to understand the underlying cause though. Maybe @eriknw has ideas?

