Skip to content

Distributed Partd #6164

@mrocklin

Description

@mrocklin

Problem

I was chatting with some RAPIDS devs today (@kkraus14 @quasiben @randerzander @jakirkham @madsbk @pentschev and others ) and they were generally concerned about the scheduling overhead of dataframe merges/set_index operations (anything that uses shuffle). There is an effort to improve scheduling performance generally, but there is also an option to try to rethink how we handle shuffling in dataframes, and maybe other similar operations in the future as well (like array rechunking in extreme situations).

The currrent "tasks" based shuffling method uses O(n log(n)) tasks and log(n) full-dataset communications. This is fine for ~100 partitions, but becomes challenging once you get to 1000 partition datasets. We also have a separate solution around partd, which does things in O(n) tasks, by buffering small shards of the data and using disk as a global communication store. Partd as written doesn't work in a distributed fashion, but we might think about how to do some similar approach in a distributed fashion.

DHT

In principle, building some sort of distributed hash table (DHT) like thing out of Dask parts isn't hard (actors, pubsub, etc, are all good examples of similar systems that took a couple of days to build). What is hard is making it resilient. For this we might look to a mixture solution between some peer-to-peer system, and a centralized SchedulerPlugin. Here is a rough design.

We run tasks on each partition of the to-be-shuffled dataframe that split that partition into pieces, and put into some local splitting service. The first task on a machine probably creates the service (we use a local thread-based-lock to coordinate between multiple tasks on the same machine). That service splits things apart, stores them in local buffers, and periodically sends batches them to peers.

How does it learn what peers it should send things to? Probably it has to check in with the scheduler, which will still act as centralized control here, but won't track every individual piece of data. Maybe these things check in with the scheduler, the scheduler alerts everyone else about what is going on, and the things push data around in a way similar to a distributed hash table (DHT). (there is lots of prior art here and algorithms to steal).

Once all of those tasks are done we run some barrier task, and then there are a bunch of other tasks which pull output partitions from the DHT. There is some excess communication here because we aren't thinking about locality for the Dask-DHT nodes, but for now let's just allow that as an allowable cost (my apologies).

So great, we get by with 2*n tasks, and probably two full communications. Not great, but it's probably better than what we have now for large shuffle computations.

What can go wrong

So what happens if a worker goes down during execution? We haven't replicated data, and the task that created that chunk of data is gone, so we've lost some things. We can still recreate and rerun that task though, we just need to tell the scheduler to do so. So in order to get resilience, we register a SchedulerPlugin that calls some custom code whenever a worker goes down. We know which workers processed which tasks, so we rerun those tasks. The other DHT nodes will get some duplicate data unfortunately, so they'll probably have to track which tasks they've seen, and know to discard any message that comes from the same task twice.

Nice things

So we get a nice combination here of resilience provided by the centralized scheduler, as well as low overhead provided by decentralized execution.

If we can create this as a more general purpose DHT solution then I suspect that people will find other uses for it other than just dataframe all-to-all shuffling.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions