[WIP] Dynamic Tasks: Tasks inserting new tasks#3879
[WIP] Dynamic Tasks: Tasks inserting new tasks#3879
Conversation
|
Thanks @madsbk . This is fun to see. Two questions:
|
Sorry, I should have made that more clear. Using
This is the regular shuffle implementation without staging where |
Can you expand on this? What I think you're saying here is that it's hard for a task to create new tasks and then return. That the creating task needs to remain active until its sub-tasks finish. Is that right? If so, does |
Ah, that makes sense. Thanks for the extra context. |
|
I am comparing it here because the new shuffle also doesn't use staging at the moment. I am working on a staging version. |
|
Hrm, but then you still have |
What I am doing in this PR is like |
True, but using |
Sorry to keep asking questions, but I'm not sure what this means. It might help if you were to provide some very simple usage examples of what kinds of problems this solves. Right now I understand that you're focused on dataframe shuffles, but if we want to change the scheduler we will probably need to write more general documentation around a feature like this. This would also help reviewers to understand the motivations of the work more generally. I enjoyed reading your docstring (thanks for including it in the header comment). I think that the applications mentioned there would be good examples to bring up, both in docs and tests. I am especially curious where the current tasks-from-tasks approach falls short and how this approach can make things cleaner. Once I know more about what you're doing here we can bring in the right other people who use the tasks-from-tasks approach to get their feedback. |
OK, it seems like there are two simultaneous conversations happening here.
In the interest of other readers I suggest that we move the dataframe shuffles conversation to another issue (I suspect that such an issue exists somewhere already) and lets focus on the tasks-inserting-tasks feature here. |
|
Hrm, after a brief search I couldn't find a good general "dataframe shuffles are slow at scale" issue. I've written one up here. Hopefully we can centralize conversation there: dask/dask#6314 |
7b99cbc to
3147d81
Compare
3147d81 to
7423ca5
Compare
|
In order to explain this PR, I will walk through a shuffle example that hopefully makes it clear how Initially, we create a task graph that, after fusion, looks like ( Now, when one of the Similarly, when the other At this point, the graph is identical with the original shuffle approach, which makes is possible to re-calculate the whole calculation and thus support resilient. I am still working on getting some performance numbers on large datasets, which is where this PR will makes a significant difference hopefully :) |
|
With regards to accelerating dataframe shuffle operations, I think that we should consider how this would be used to batch together many local partitions when splitting. If there is a clean way of doing that that allowed for orders of magnitude scale increases and roughly O(n) tasks then I think that that would be very exciting. |
|
I often come back to this issue when thinking about how to track dependencies for dynamic tasks. I don't exactly need resilience; I am more interested in propagating information about all tasks executed during a computation. The more ergonomic way that would happen is to have some hook that would be triggered with each '.compute()' call. Then I would be able to maintain metadata on TaskState tracking these dynamic internal dependencies. Internal is the key word here; by task dependencies dask means inputs, while for my use case I need to consider keys for which compute was called as dependencies. insert_tasks wouldn't suit me, since I do want to modify the current task by adding instances of internal dependencies, ie dependencies on compute calls. There is a possible issue with resilience here, since re-execution may result in a simplified graph, or in a resource depletion as all internal dependencies are computed at once. This doesn't apply in my use case, since resilience is not a concern. I hope this can fit here, as I am interested/using tasks within tasks. I will make a separate issue for my use case once I have something readable. |



This PR implements #3811 by introducing a new scheduler function
insert_tasks(), which enables tasks to extend themselves while running. The idea is that a task, while running on a worker, can callinsert_tasks()to insert new tasks.insert_tasks()makes scheduling extremely flexible. Likeclient.submit(), it can create any kind of task but instead of appending to the existing schedular graph, the task is inserted after the currently running task. This make it possible to reduce scheduling overhead, minimizing memory use, overlap computation and communication, and support graph features such as while loops and if-else statements (#3811 (comment)).Notice, the original idea in #3811 was to modify the running task but I realized that in order to support full resilience we must be able to recompute any previous task. Thus, in this PR we insert new tasks instead of overwriting/modifying them. This also means that after the call to
insert_tasks(), the state of the scheduler is exactly as if the inserted tasks were there from the beginning.The insert_tasks doc
Preliminary results
To demonstrate its use, I have implemented shuffle that make use of
insert_tasks()to create the all-to-all communication:dynamic_tasks.py. Because of this, the graph creation and optimization overhead is reduced significantly. (Warning: the code is an early prototype and need some cleaning up).The following is running
shuffleon atimeserieswith 1331 partitions (similar to the test in dask/dask#6163 (comment)).I am working on implementing the new shuffle with staging and do some benchmarking of complete workflows.
The next steps
Explore the possibilities of
insert_tasks(). What operations could benefit from usinginsert_tasks()? What are the limitations?Use
insert_tasks()and existing primitives to make it more user-friendly. How should the high-level interface look like?More benchmarks -- both small and large datasets
Support the
threadedandmultiprocessingschedulers. The idea here is to wrapinsert_tasks()in a Dask API that support the local schedulers.cc. @mrocklin, @jacobtomlinson, @sjperkins, @beckernick, @spirali