-
-
Notifications
You must be signed in to change notification settings - Fork 756
Description
Graphs like this are not currently scheduled well:
. . . . . . . . . . . . . . . .
|\|\|\|\|/|/|/| |\|\|\|\|/|/|/|
| | | | a | | | | | | | b | | |
* * * * * * * * * * * * * * * *
The . tasks should definitely take into account the location of the * data when scheduling. But if we have 5 workers, every worker will have * data on it, but only 2 workers will have an a or b. In scheduling the first few .s, there's a tug-of-war between the a and the *—which do we want to schedule near? We want a way to disregard the a.
Say (*, 0) completes first, and a is already complete, on a different worker. Each * is the same size (or smaller than) a. We now schedule (., 0). If we choose to go to a, we might have a short-term gain, but we've taken a spot that could have gone to better use in the near future. Say the worker holding a is already running (*, 6). Now, (., 6) may get scheduled on yet another worker, because (., 0) is already running where it should have gone, and the scheduler prioritizes "where can I start this task soonest" over "how can I minimize data transfer".
This can cascade through all the .s, until we've transferred most root tasks to different workers (on top of a, which we have to transfer everywhere no matter what).
What could have been a nearly-zero-transfer operation is instead likely to transfer every piece of input data to a different worker, greatly increasing memory usage.
This pattern will occur anytime you broadcast one thing against another in a binary operation (which can occur in arrays, dataframes, bags, etc.).
import dask.array as da
a = da.random.random(100, chunks=10)
x = da.random.random(1)
r = (a[1:] * x) # `[1:]` slicing prevents blockwise fusion
r.visualize(optimize_graph=True, collapse_outputs=True)In the above case, the mul tasks will tend to "dogpile" onto the one worker that holds the middle random_sample task (x).
@crusaderky has also observed cases where this "dogpile" effect can cause what should be an embarrassingly-parallell operation to all get scheduled on one worker, overwhelming it.
#5325 was a heuristic attempt to fix this, but there are probably better ways to approach it.
