Skip to content

[Never Merge] Prototype for scalable dataframe shuffle#8223

Draft
gjoseph92 wants to merge 7 commits intodask:mainfrom
gjoseph92:shuffle_service
Draft

[Never Merge] Prototype for scalable dataframe shuffle#8223
gjoseph92 wants to merge 7 commits intodask:mainfrom
gjoseph92:shuffle_service

Conversation

@gjoseph92
Copy link
Collaborator

@gjoseph92 gjoseph92 commented Oct 5, 2021

This is a duplicate of #8209, but I'm taking over from @fjetter since I may want to occasionally push small fixes here. Original message copied below, with edits.

This is a prototype implementation of a new dask.dataframe shuffle algorithm which runs on a dask.distributed cluster. Different to the task based shuffle algorithm this uses an out-of-band communication and administration approach to circumvent scheduler imposed bottlenecks.

How to try / Feedback requested

Install this branch of dask/dask (pip install -U git+https://github.com/gjoseph92/dask@shuffle_service) and run a shuffle workload (set_index, groupby, etc.), passing the keyword argument shuffle="service". Until distributed 2021.10.0 or later is released, you'll also need to install dask/distributed from main (pip install -U git+https://github.com/dask/distributed).

With this PR, we've been able to easily do shuffles that crash the cluster currently. Additionally, since this writes intermediate data to disk, you can shuffle larger-than-memory DataFrames. Note that the data written to disk won't show up as spilled-to-disk on the dashboard. Similarly, you'll see high unmanged memory on workers while the shuffle is working.

As a rule of thumb:

  • total cluster disk space needs to fit the full dataset
  • worker RAM needs to fit ~2GiB + partition_size * nthreads * ~5x safety factor?

Additionally, more threads don't improve performance much (since everything is GIL-bound), so we recommend 2 threads unless other parts of your workload require more.

We would love you to try this out and report back to us. This implementation is targeted for large scale data processing and we would appreciate people trying this out and giving us feedback about it. Especially if you have large datasets sitting around. If you encounter any stability or performance related issues, please open a dedicated ticket and link to this PR such that we can structure discussions a bit.

⚠️ Warnings ⚠️

This is experimental. We do not expect this PR to ever be merged. Instead, we'll take ideas (and feedback) from this PR into a different one that's better-designed, stable, and maintainable.

With that explained, here are things to look out for:

  • Doesn't work for merge yet, because that requires multiple simultaneous shuffles
  • Requires distributed>=2021.10.0 which doesn't exist yet, so until it does, you need to install distributed from main
  • All workers must have >= 2 threads
  • The cluster must have enough total disk space to hold the entire dataset (but can have much less RAM than that)
  • If a worker runs out of disk space, the whole shuffle will error
  • Workers sometimes run out of memory and die randomly during the transfer phase
  • If a worker dies during the transfer phase, the cluster will deadlock for 15 minutes typically (distributed's 300s connect timeout * 3 retries), then the task will error
  • If a worker dies during the unpack phase, the cluster will deadlock indefinitely
  • Multiple shuffles at the same time will fail in strange ways. Running a shuffle more than once on a cluster without restarting it could possibly behave oddly too.
  • Mostly tested on synthetic data from dask.datasets.timeseries. Real data with uneven distributions and input partition sizes may behave poorly.
  • It's slower than it should be and mostly GIL-bound (though probably still faster than a standard task-based shuffle): Testing network performance distributed#5258 (comment), Dask shuffle performance help pandas-dev/pandas#43155 (comment)
  • Remember to pass shuffle="service"!

Reviews

For all who are brave enough to review this I would only encourage a high level pass. There are many moving parts and many open TODOs. We're discussing breaking off some parts of the implementation to allow for easier review (or move some parts to dask/distributed). This is still TBD but suggestions are welcome.

High level design

The concurrency model driving this is rather complex and is made of multiple coroutines and threads to deal with grouping, concatenating, sending and receiving data. This process is kicked off in the transfer task which is applied on every input partition. This allows computation and network to efficiently overlap. Data is buffered efficiently such that network overhead for small sized data chunks, shards, is minimal.

The receiving end of these submissions is a small extension on the Worker which accepts incoming data and caches it (on disk, see below) for later processing. The task graph currently employs a barrier task for synchronization and buffer flushing. The output partitions will then be picked up by the unpack task which collects the data stored on the given worker and extracts it into a runnable task. From there on, everything is BAU.

To enable larger than (cluster) memory dataset shuffles there is an efficient spill to disk implementation which caches all received shards on disk while the shuffle is still running. This is currently not optional. There is currently no persistence hierarchy implemented as is usual for a Worker holding data.

References

cc @mrocklin , @gjoseph92 , @quasiben , @madsbk , ...?

mrocklin and others added 3 commits October 1, 2021 17:59
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
I'd thought we'd see the line number in the dask error but apparently not
@FredericOdermatt
Copy link
Contributor

I am working with 450 million rows and have to set a new index for a group-by operation on that dataset. The original data is 23GiB when saved as parquet files and doing the .set_index() operation was eating all 390GiB of available RAM space on the local multi-cpu cluster I am using. After changing to this PR and running with shuffle="service" the shuffle on the 450 million rows (41 million unique values in new index) completed in 30 minutes on 30 process workers while never using more than 50 GiB of RAM. Therefore my review for this PR is very positive. I just saw this and applied it and it worked great for me.

@gjoseph92
Copy link
Collaborator Author

Great to hear, thanks @FredericOdermatt! I'm curious, did you ever try shuffle="disk"? This PR is primarily designed for multi-machine clusters, so on a single machine, the disk-based shuffle should work similarly to this one. I'm curious if one works better than the other.

@bsesar
Copy link

bsesar commented Dec 1, 2021

Great to hear, thanks @FredericOdermatt! I'm curious, did you ever try shuffle="disk"? This PR is primarily designed for multi-machine clusters, so on a single machine, the disk-based shuffle should work similarly to this one. I'm curious if one works better than the other.

@gjoseph92, I have found that merging is much slower when using shuffle='disk' (#5554).

@terramars
Copy link

I tried this and got :

distributed.worker - WARNING - Compute Failed
Function: unpack
args: (<dask.dataframe.shuffle_service.ShuffleService object at 0x7fb5bf082dd0>, 0, None)
kwargs: {}
Exception: 'AttributeError("'ShuffleService' object has no attribute 'retrieve_futures'")'

While trying to parquetize the output of the sort.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants