Skip to content

Large Dataframe shuffle operations #6314

@mrocklin

Description

@mrocklin

Some dataframe operations require something like a full sort of the data. This can be expensive for a variety of reasons. This limits the scalability of Dask Dataframe to very large datasets. These operations include:

  1. set_index
  2. groupby-apply
  3. join on a non-sorted column

Our current approach is to split every input partition into several pieces, move those pieces around to their final destination, and then merge them into output partitions. In an unsorted input dataset every input partition will have something to contribute to most output partitions, so this is an n**2 operation. As the number of partitions increases this an become debilitating due to scheduler overhead.

So for example if we have 1000 partitions then this might create 1,000,000 tasks, which if overhead is something like 1ms per task (an overetimate) then this would add 1000s of overhead (about 15 minutes) regardless of how large our cluster is.

Staging: Today's approach

Today we avoid this by staging communication. We split things up not into 1000 little chunks, but into 30, and we do this a couple of times. This results in n * log_n tasks and log_n full communications. In our example above we would incur something like 30,000 + 30,000 tasks, or about 10s of overhead, but we would have to move our dataset around twice (which if you're on slow network can be troublesome)

What are some other approaches that we could take here?

See also

Metadata

Metadata

Assignees

No one assigned

    Labels

    dataframeneeds attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions