Skip to content

[WIP] Dynamic Tasks: Tasks inserting new tasks#3879

Closed
madsbk wants to merge 4 commits intodask:mainfrom
madsbk:scheduler_insert_tasks
Closed

[WIP] Dynamic Tasks: Tasks inserting new tasks#3879
madsbk wants to merge 4 commits intodask:mainfrom
madsbk:scheduler_insert_tasks

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented Jun 10, 2020

This PR implements #3811 by introducing a new scheduler function insert_tasks(), which enables tasks to extend themselves while running. The idea is that a task, while running on a worker, can call insert_tasks() to insert new tasks.

insert_tasks() makes scheduling extremely flexible. Like client.submit(), it can create any kind of task but instead of appending to the existing schedular graph, the task is inserted after the currently running task. This make it possible to reduce scheduling overhead, minimizing memory use, overlap computation and communication, and support graph features such as while loops and if-else statements (#3811 (comment)).

Notice, the original idea in #3811 was to modify the running task but I realized that in order to support full resilience we must be able to recompute any previous task. Thus, in this PR we insert new tasks instead of overwriting/modifying them. This also means that after the call to insert_tasks(), the state of the scheduler is exactly as if the inserted tasks were there from the beginning.

The insert_tasks doc
def insert_tasks(
       self,
        comm=None,
        cur_key=None,
        new_tasks=None,
        rearguard_key=None,
        rearguard_input=None,
    ):
        """
        Insert new tasks immediately after a running task.

        `new_tasks` specifies the new tasks and must for each task include a list of
        dependencies and/or dependents. Since a dependency might not exist when
        calling this function (e.g. if the dependency is created by a later call to
        `insert_tasks()`), a task can also specify its dependents.

        To tie the output of the new tasks back into the graph, we use a rearguard
        task. The rearguard should be a dummy operation immediately after the callee.

        Parameters
        ----------
        comm:
            Ignored, needed by the RPC call
        cur_key : str
            The key of the currently running the new tasks will be inserted after.
        new_tasks : list[dict]
            Dictionaries of tasks, which should contain the following information:
                - "key": the key of new task
                - "dependencies": list of dependencies of the task
                - "dependents": list of dependents of the task
                - "task": the Dask task (serialized)
                - "priority": the priority of the task (optional)
        rearguard_key : str
            The key of the rearguard
        rearguard_input: str
            The input keys for the rearguard
        """

Preliminary results

To demonstrate its use, I have implemented shuffle that make use of insert_tasks() to create the all-to-all communication: dynamic_tasks.py. Because of this, the graph creation and optimization overhead is reduced significantly. (Warning: the code is an early prototype and need some cleaning up).
The following is running shuffle on a timeseries with 1331 partitions (similar to the test in dask/dask#6163 (comment)).

App Time Speedup
New shuffle without staging 0.5 s 1
Regular shuffle with staging 3.1 s 6.2
Regular shuffle without staging 238.7 s 477.4

I am working on implementing the new shuffle with staging and do some benchmarking of complete workflows.

The next steps

  • Explore the possibilities of insert_tasks(). What operations could benefit from using insert_tasks()? What are the limitations?

  • Use insert_tasks() and existing primitives to make it more user-friendly. How should the high-level interface look like?

  • More benchmarks -- both small and large datasets

  • Support the threaded and multiprocessing schedulers. The idea here is to wrap insert_tasks() in a Dask API that support the local schedulers.


cc. @mrocklin, @jacobtomlinson, @sjperkins, @beckernick, @spirali

@beckernick
Copy link
Member

cc @randerzander @ayushdg @VibhuJawa

@madsbk madsbk changed the title [WIP] Dynamic Tasks: Tasks insertiong new tasks [WIP] Dynamic Tasks: Tasks inserting new tasks Jun 10, 2020
@mrocklin
Copy link
Member

Thanks @madsbk . This is fun to see.

Two questions:

  1. Can I ask you to compare against https://docs.dask.org/en/latest/futures.html#submit-tasks-from-tasks ? When are times when we would to use the approach here instead?
  2. It looks like you have found a 400x speedup on some workload. Can I ask you to share more about that workload, what was slow before, and how this helps so much?

@madsbk
Copy link
Contributor Author

madsbk commented Jun 11, 2020

  1. Can I ask you to compare against https://docs.dask.org/en/latest/futures.html#submit-tasks-from-tasks ? When are times when we would to use the approach here instead?

Sorry, I should have made that more clear. Using get_client().submit() and secede() works great in many cases but in cases of shuffle, this approach requires extra memory because the inputs to the shuffle jobs must be in-memory until the jobs completes. We can handle this problem in the regular shuffle implementation by prioritizing the shuffle-split tasks and in this PR by setting the "priority" of the newly inserted tasks.

  1. It looks like you have found a 400x speedup on some workload. Can I ask you to share more about that workload, what was slow before, and how this helps so much?

This is the regular shuffle implementation without staging where max_branch == npartitions and it creates n**2 number of tasks!

@mrocklin
Copy link
Member

this approach requires extra memory because the inputs to the shuffle jobs must be in-memory until the jobs completes

Can you expand on this? What I think you're saying here is that it's hard for a task to create new tasks and then return. That the creating task needs to remain active until its sub-tasks finish. Is that right? If so, does fire_and_forget get you past this?

@mrocklin
Copy link
Member

This is the regular shuffle implementation without staging where max_branch == npartitions and it creates n**2 number of tasks!

Ah, that makes sense. Thanks for the extra context.

@madsbk
Copy link
Contributor Author

madsbk commented Jun 11, 2020

I am comparing it here because the new shuffle also doesn't use staging at the moment. I am working on a staging version.

@mrocklin
Copy link
Member

Hrm, but then you still have n**2 tasks, right? It's nice that you can create those tasks on the fly, but fundamentally this still seems like it'll quickly become a bottleneck as you increase the number of partitions. I may not understand how your new shuffle algorithm works though.

@madsbk
Copy link
Contributor Author

madsbk commented Jun 11, 2020

Can you expand on this? What I think you're saying here is that it's hard for a task to create new tasks and then return. That the creating task needs to remain active until its sub-tasks finish. Is that right? If so, does fire_and_forget get you past this?

What I am doing in this PR is like fire_and_forget() but with dependency support.

@madsbk
Copy link
Contributor Author

madsbk commented Jun 11, 2020

Hrm, but then you still have n**2 tasks, right? It's nice that you can create those tasks on the fly, but fundamentally this still seems like it'll quickly become a bottleneck as you increase the number of partitions. I may not understand how your new shuffle algorithm works though.

True, but using insert_tasks() we don't have to create all tasks at once, we can organize the communication as we see fit.
Staging is properly preferable when partition sizes are small because it limits the splitting to max_branchs but with large enough partitions I expect non-staging to be preferable.

@mrocklin
Copy link
Member

What I am doing in this PR is like fire_and_forget() but with dependency support.

Sorry to keep asking questions, but I'm not sure what this means.

It might help if you were to provide some very simple usage examples of what kinds of problems this solves. Right now I understand that you're focused on dataframe shuffles, but if we want to change the scheduler we will probably need to write more general documentation around a feature like this. This would also help reviewers to understand the motivations of the work more generally.

I enjoyed reading your docstring (thanks for including it in the header comment). I think that the applications mentioned there would be good examples to bring up, both in docs and tests. I am especially curious where the current tasks-from-tasks approach falls short and how this approach can make things cleaner.

Once I know more about what you're doing here we can bring in the right other people who use the tasks-from-tasks approach to get their feedback.

@mrocklin
Copy link
Member

True, but using insert_tasks() we don't have to create all tasks at once, we can organize the communication as we see fit.
Staging is properly preferable when partition sizes are small because it limits the splitting to max_branchs but with large enough partitions I expect non-staging to be preferable.

OK, it seems like there are two simultaneous conversations happening here.

  1. Dataframe shuffles
  2. Tasks inserting new tasks.

In the interest of other readers I suggest that we move the dataframe shuffles conversation to another issue (I suspect that such an issue exists somewhere already) and lets focus on the tasks-inserting-tasks feature here.

@mrocklin
Copy link
Member

Hrm, after a brief search I couldn't find a good general "dataframe shuffles are slow at scale" issue. I've written one up here. Hopefully we can centralize conversation there: dask/dask#6314

@madsbk madsbk force-pushed the scheduler_insert_tasks branch from 7b99cbc to 3147d81 Compare June 17, 2020 18:07
@madsbk madsbk force-pushed the scheduler_insert_tasks branch from 3147d81 to 7423ca5 Compare June 19, 2020 07:18
@madsbk
Copy link
Contributor Author

madsbk commented Jun 23, 2020

In order to explain this PR, I will walk through a shuffle example that hopefully makes it clear how insert_tasks() works.

Initially, we create a task graph that, after fusion, looks like (npartitions=2):

Instead of having the getitem and concat tasks in the graph from the start, we have rearguard_dyshuffle and dynshuffle.

Now, when one of the dynshuffle tasks is executed, it will first call shuffle_group() and then call insert_tasks() to create the getitem and concat tasks:

Similarly, when the other dynshuffle task is executed, the graph becomes:

At this point, the graph is identical with the original shuffle approach, which makes is possible to re-calculate the whole calculation and thus support resilient.


I am still working on getting some performance numbers on large datasets, which is where this PR will makes a significant difference hopefully :)

@mrocklin
Copy link
Member

With regards to accelerating dataframe shuffle operations, I think that we should consider how this would be used to batch together many local partitions when splitting. If there is a clean way of doing that that allowed for orders of magnitude scale increases and roughly O(n) tasks then I think that that would be very exciting.

@LunarLanding
Copy link

I often come back to this issue when thinking about how to track dependencies for dynamic tasks. I don't exactly need resilience; I am more interested in propagating information about all tasks executed during a computation.

The more ergonomic way that would happen is to have some hook that would be triggered with each '.compute()' call. Then I would be able to maintain metadata on TaskState tracking these dynamic internal dependencies.

Internal is the key word here; by task dependencies dask means inputs, while for my use case I need to consider keys for which compute was called as dependencies.

insert_tasks wouldn't suit me, since I do want to modify the current task by adding instances of internal dependencies, ie dependencies on compute calls.

There is a possible issue with resilience here, since re-execution may result in a simplified graph, or in a resource depletion as all internal dependencies are computed at once. This doesn't apply in my use case, since resilience is not a concern.

I hope this can fit here, as I am interested/using tasks within tasks. I will make a separate issue for my use case once I have something readable.

Base automatically changed from master to main March 8, 2021 19:04
@madsbk madsbk closed this Aug 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants