-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Some dataframe operations require something like a full sort of the data. This can be expensive for a variety of reasons. This limits the scalability of Dask Dataframe to very large datasets. These operations include:
- set_index
- groupby-apply
- join on a non-sorted column
Our current approach is to split every input partition into several pieces, move those pieces around to their final destination, and then merge them into output partitions. In an unsorted input dataset every input partition will have something to contribute to most output partitions, so this is an n**2 operation. As the number of partitions increases this an become debilitating due to scheduler overhead.
So for example if we have 1000 partitions then this might create 1,000,000 tasks, which if overhead is something like 1ms per task (an overetimate) then this would add 1000s of overhead (about 15 minutes) regardless of how large our cluster is.
Staging: Today's approach
Today we avoid this by staging communication. We split things up not into 1000 little chunks, but into 30, and we do this a couple of times. This results in n * log_n tasks and log_n full communications. In our example above we would incur something like 30,000 + 30,000 tasks, or about 10s of overhead, but we would have to move our dataset around twice (which if you're on slow network can be troublesome)
What are some other approaches that we could take here?
See also
- A similar issue for Dask Array Very large dask arrays #3514
- A plan for distributed shuffling service Distributed Partd #6164
- An approach to create tasks from tasks [WIP] Dynamic Tasks: Tasks inserting new tasks distributed#3879