[Never Merge] Prototype for scalable dataframe shuffle#8223
[Never Merge] Prototype for scalable dataframe shuffle#8223
Conversation
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
I'd thought we'd see the line number in the dask error but apparently not
At least it fails fast now, instead of deadlocking
abb6408 to
fd1b02b
Compare
|
I am working with 450 million rows and have to set a new index for a group-by operation on that dataset. The original data is 23GiB when saved as parquet files and doing the |
|
Great to hear, thanks @FredericOdermatt! I'm curious, did you ever try |
@gjoseph92, I have found that merging is much slower when using |
|
I tried this and got : distributed.worker - WARNING - Compute Failed While trying to parquetize the output of the sort. |
This is a duplicate of #8209, but I'm taking over from @fjetter since I may want to occasionally push small fixes here. Original message copied below, with edits.
This is a prototype implementation of a new
dask.dataframeshuffle algorithm which runs on adask.distributedcluster. Different to the task based shuffle algorithm this uses an out-of-band communication and administration approach to circumvent scheduler imposed bottlenecks.How to try / Feedback requested
Install this branch of
dask/dask(pip install -U git+https://github.com/gjoseph92/dask@shuffle_service) and run a shuffle workload (set_index,groupby, etc.), passing the keyword argumentshuffle="service". Until distributed 2021.10.0 or later is released, you'll also need to installdask/distributedfrommain(pip install -U git+https://github.com/dask/distributed).With this PR, we've been able to easily do shuffles that crash the cluster currently. Additionally, since this writes intermediate data to disk, you can shuffle larger-than-memory DataFrames. Note that the data written to disk won't show up as spilled-to-disk on the dashboard. Similarly, you'll see high unmanged memory on workers while the shuffle is working.
As a rule of thumb:
Additionally, more threads don't improve performance much (since everything is GIL-bound), so we recommend 2 threads unless other parts of your workload require more.
We would love you to try this out and report back to us. This implementation is targeted for large scale data processing and we would appreciate people trying this out and giving us feedback about it. Especially if you have large datasets sitting around. If you encounter any stability or performance related issues, please open a dedicated ticket and link to this PR such that we can structure discussions a bit.
This is experimental. We do not expect this PR to ever be merged. Instead, we'll take ideas (and feedback) from this PR into a different one that's better-designed, stable, and maintainable.
With that explained, here are things to look out for:
mergeyet, because that requires multiple simultaneous shufflesmaintransferphasetransferphase, the cluster will deadlock for 15 minutes typically (distributed's 300s connect timeout * 3 retries), then the task will errorunpackphase, the cluster will deadlock indefinitelydask.datasets.timeseries. Real data with uneven distributions and input partition sizes may behave poorly.shuffle="service"!Reviews
For all who are brave enough to review this I would only encourage a high level pass. There are many moving parts and many open TODOs. We're discussing breaking off some parts of the implementation to allow for easier review (or move some parts to dask/distributed). This is still TBD but suggestions are welcome.
High level design
The concurrency model driving this is rather complex and is made of multiple coroutines and threads to deal with grouping, concatenating, sending and receiving data. This process is kicked off in the
transfertask which is applied on every input partition. This allows computation and network to efficiently overlap. Data is buffered efficiently such that network overhead for small sized data chunks, shards, is minimal.The receiving end of these submissions is a small extension on the Worker which accepts incoming data and caches it (on disk, see below) for later processing. The task graph currently employs a barrier task for synchronization and buffer flushing. The output partitions will then be picked up by the
unpacktask which collects the data stored on the given worker and extracts it into a runnable task. From there on, everything is BAU.To enable larger than (cluster) memory dataset shuffles there is an efficient spill to disk implementation which caches all received shards on disk while the shuffle is still running. This is currently not optional. There is currently no persistence hierarchy implemented as is usual for a Worker holding data.
References
cc @mrocklin , @gjoseph92 , @quasiben , @madsbk , ...?