Skip to content

WIP Prototype for scalable dataframe shuffle#8209

Closed
fjetter wants to merge 1 commit intodask:mainfrom
fjetter:shuffle_service
Closed

WIP Prototype for scalable dataframe shuffle#8209
fjetter wants to merge 1 commit intodask:mainfrom
fjetter:shuffle_service

Conversation

@fjetter
Copy link
Member

@fjetter fjetter commented Oct 1, 2021

This is a prototype implementation of a new dask.dataframe shuffle algorithm which runs on a dask.distributed cluster. Different to the task based shuffle algorithm this uses an out-of-band communication and administration approach to circumvent scheduler imposed bottlenecks.

How to try / Feedback requested

This implementation currently runs on all recent distributed versions since it automatically registers all required worker extensions on the fly. Just install this branch of dask/dask and run a shuffle workload (set_index, groupby, etc.)

We would love you to try this out and report back to us. This implementation is targeted for large scale data processing and we would appreciate people trying this out and giving us feedback about it. Especially if you have large datasets sitting around.
If you encounter any stability or performance related issues, please open a dedicated ticket and link to this PR such that we can structure discussions a bit.

Restrictions and Limitations

There are currently a few limitations to be aware of when trying this out. This list is likely not exhaustive

  • All workers must have >= 2 threads
  • All workers must have disk
  • Cluster must not scale / adapt

Reviews

For all who are brave enough to review this I would encourage a high level pass first. There are many moving parts and many open TODOs. We're discussing breaking off some parts of the implementation to allow for easier review (or move some parts to dask/distributed). This is still TBD but suggestions are welcome.

High level design

The concurrency model driving this is rather complex and is made of multiple coroutines and threads to deal with grouping, concatenating, sending and receiving data. This process is kicked off in the transfer task which is applied on every input partition. This allows computation and network to efficiently overlap. Data is buffered efficiently such that network overhead for small sized data chunks, shards, is minimal.

The receiving end of these submissions will be a small extension on the Worker which accepts incoming data and caches it (on disk, see below) for later processing. The task graph currently employs a barrier task for synchronization and buffer flushing. The output partitions will then be picked up by the unpack task which collects the data stored on the given worker and extracts it into a runnable task. From there on, everything is BAU.

To enable larger than (cluster) memory dataset shuffles there is an efficient spill to disk implementation which caches all received shards on disk while the shuffle is still running. This is currently not optional. There is currently no persistence hierarchy implemented as is usual for a Worker holding data.

References

cc @mrocklin , @gjoseph92 , @quasiben , @madsbk , ...?

Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
@gjoseph92
Copy link
Collaborator

Closing in favor of #8223 (so I can push commits as necessary)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants