WIP Prototype for scalable dataframe shuffle#8209
Closed
Conversation
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
Collaborator
|
Closing in favor of #8223 (so I can push commits as necessary) |
3 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This is a prototype implementation of a new
dask.dataframeshuffle algorithm which runs on adask.distributedcluster. Different to the task based shuffle algorithm this uses an out-of-band communication and administration approach to circumvent scheduler imposed bottlenecks.How to try / Feedback requested
This implementation currently runs on all recent distributed versions since it automatically registers all required worker extensions on the fly. Just install this branch of
dask/daskand run a shuffle workload (set_index, groupby, etc.)We would love you to try this out and report back to us. This implementation is targeted for large scale data processing and we would appreciate people trying this out and giving us feedback about it. Especially if you have large datasets sitting around.
If you encounter any stability or performance related issues, please open a dedicated ticket and link to this PR such that we can structure discussions a bit.
Restrictions and Limitations
There are currently a few limitations to be aware of when trying this out. This list is likely not exhaustive
Reviews
For all who are brave enough to review this I would encourage a high level pass first. There are many moving parts and many open TODOs. We're discussing breaking off some parts of the implementation to allow for easier review (or move some parts to dask/distributed). This is still TBD but suggestions are welcome.
High level design
The concurrency model driving this is rather complex and is made of multiple coroutines and threads to deal with grouping, concatenating, sending and receiving data. This process is kicked off in the
transfertask which is applied on every input partition. This allows computation and network to efficiently overlap. Data is buffered efficiently such that network overhead for small sized data chunks, shards, is minimal.The receiving end of these submissions will be a small extension on the Worker which accepts incoming data and caches it (on disk, see below) for later processing. The task graph currently employs a barrier task for synchronization and buffer flushing. The output partitions will then be picked up by the
unpacktask which collects the data stored on the given worker and extracts it into a runnable task. From there on, everything is BAU.To enable larger than (cluster) memory dataset shuffles there is an efficient spill to disk implementation which caches all received shards on disk while the shuffle is still running. This is currently not optional. There is currently no persistence hierarchy implemented as is usual for a Worker holding data.
References
cc @mrocklin , @gjoseph92 , @quasiben , @madsbk , ...?