-
-
Notifications
You must be signed in to change notification settings - Fork 749
Description
Problem
The goal of this idea is to make Dask dataframe operations more scalable by removing bottlenecks in centralized components (client & server). This text is separated into two parts, first part is about removing the problem in client, the second one focuses on the server.
Let's start with the client. The main problem that prevents scalability of dataframe operations on the client is that the complexity of the current implementation of pandas operations depends on the number of used partitions. Therefore even operations like "column + 1" will be more and more expensive in the client as the cluster grows (because the number of partitions will become larger) and it will be more and more problematic. The situation gets even worse in operations like "shuffle" as the grow is superlinear.
If all Dataframe graphs could be created with more abstract operations and without referring to individual tasks, we could remove a lot of burden from the client caused by materializing such tasks. The number of partitions should be just a numeric parameter for the client, it should not affect how much work it has to do to construct the dataframe graph.
Phase 1
The goal of this phase is to make client’s code independent of the number of partitions. Hence operations like "column + 1" or "shuffle" should have constant complexity in client.
The approach is quite simple, we can move expansion of the graph to the server. So instead of constructing individual tasks in client and send a big graph to server, we just send a declarative description how to build a task graph.
It has the following benefits:
- We avoid materialization and serialization of large tasks in the client (and one deserialization in the server).
- Tasks constructed in the server can be put directly into the scheduler even when the whole task graph is not yet fully constructed. So the computation will start as soon as possible.
- In the case of Rsds, constructing the task graph in Rust will be probably much faster than in Python.
The big question is to how actually describe the task graph. There are many options, e.g:
- Python code -- This would be easy but with many disadvantages. It is not language independent, we lose some performance gains, it prevents Phase 2.
- A fixed set of graph operations that are supported by the server (e.g. for a map operation, for shuffle) - This would be ideal if we could find a small fixed set of task graph expanders and just support them. It is probably doable for Dataframe; however, flexibility of this solution is bad for a generic user code.
- Presburger arithmetic (PA) - PA is a fragment of arithmetics without multiplication. I strongly believe that all real-world dependencies between tasks can be efficiently described by PA. PA is decidable and many operations can be implemented quite efficiently for real-world formulas. I have implemented a solver of PA via automatic structures (https://github.com/spirali/pas) and first experiments with task-like graphs seems promising.
This phase needs the following modifications:
- HighlevelGraph that contains specific tasks has to be replaced by an "abstract graph" that contains only description of tasks. Despite the fact that HighlevelGraph is nearly everywhere, it is not as bad as there is only a relatively small number of places where the graph is constructed.
- The server needs to know how to expand the graph.
- The server needs to know how to compose arguments for a function. It does not have to understand or construct arguments itself, it just needs to compose the wrapping list.
It will induce a small change in the protocol (and therefore also in the worker). Right now task arguments are serialized as a pickled list of arguments. It has to be changed to a msgpack list with pickled elements.
If you know of any examples of dataframe graphs that could not be constructed from such abstract operations and that need to work with specific individual tasks, please let us know.
Phase 2
When Phase 1 is finished and when the right description is chosen, we can also avoid the expansion of the task graph in the scheduler. This change is much more complex as the scheduler has to be able to do some nontrivial operations with the task graph while it has to remain in the abstract form. I believe that it can be done for some reasonable set of graph expanders. But I also have some weak evidence and ideas how to do it in PA while maintaining reasonable complexity for practical cases.
The main negative is that it would need a big change in the scheduler and also in workers (but transparent to client if phase 1 is finished).
Our road plan
- Introduce "CompactGraph" as an abstract replacement for "HighlevelGraph", in the first prototype with a fixed set of graph expanders suitable for pandas-like operations.
- Implement expanders in the server (probably into rsds as we are more familiar with the code).
- Evaluate results. If a simple declarative description is not enough then try PA solution.
- If it works and elimination of client overhead is not enough, then go to Phase 2.
This is a very generic overview of our plan, we would be very glad for any comments, if it makes sense to you and/or if there is already some work like this going on.
Relevant issue: #3783