Skip to content

[REVIEW] Prioritize getitem tasks in shuffle#7826

Closed
madsbk wants to merge 1 commit intodask:mainfrom
madsbk:priority_shuffle_split
Closed

[REVIEW] Prioritize getitem tasks in shuffle#7826
madsbk wants to merge 1 commit intodask:mainfrom
madsbk:priority_shuffle_split

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented Jun 23, 2021

This PR implements custom task prioritization of getitem() tasks in shuffle operations. See #6051 for a detailed discussion of the performance benefits.

    @staticmethod
    def __dask_distributed_annotations_unpack__(
        annotations: MutableMapping[str, Any],
        new_annotations: Optional[Mapping[str, Any]],
        keys: Iterable[Hashable],
    ) -> None:
        """Implement custom task prioritization of `getitem()` tasks.

        The scheduling policies of Dask is depth-first generally speaking, which works
        great in most cases. However, in case of shuffle, it increases the memory usage
        significantly. This is because depth-first delays the freeing of the result of
        `shuffle_group()` until the end of the shuffling.

        We address this by forcing a breadth-first scheduling of the `getitem()` tasks
        that follows the `shuffle_group()` task. This way all the `getitem()` tasks
        finish immediately and the output of shuffle_group()` can be freed before
        continuing to the next phase of the operation.

        See https://github.com/dask/dask/pull/6051 for a detailed discussion
        """

@madsbk madsbk marked this pull request as ready for review June 23, 2021 09:59
@madsbk madsbk changed the title Prioritize getitem tasks in shuffle [REVIEW] Prioritize getitem tasks in shuffle Jun 23, 2021
@mrocklin
Copy link
Member

Is there something we can do here that would be a good test? The implementation here with "split-" seems somewhat brittle to me (future devs could change that name and this functionality would stop without us knowing).

Maybe we look at the Scheduler.transition_log and see that some set of tasks occurs before some other set of tasks?

@mrocklin
Copy link
Member

Also, I think that doing this in unpack is fine, but I'm also curious if it would make sense to put these annotations on the Layer itself and then trust the general machinery to move those annotations up to the scheduler. This feels somewhat less efficient, but also somewhat more general.

@madsbk
Copy link
Contributor Author

madsbk commented Jun 23, 2021

Is there something we can do here that would be a good test? The implementation here with "split-" seems somewhat brittle to me (future devs could change that name and this functionality would stop without us knowing).

Maybe we look at the Scheduler.transition_log and see that some set of tasks occurs before some other set of tasks?

I have been thinking of a more general solution but that will require some API redesign, I think.
One idea is to make order() understand annotations and prioritizes fast-tasks such as getitem(). But I think it makes sense to start with this specialized approach and get some experience before redesigning the task ordering API.

@madsbk
Copy link
Contributor Author

madsbk commented Jun 23, 2021

Also, I think that doing this in unpack is fine, but I'm also curious if it would make sense to put these annotations on the Layer itself and then trust the general machinery to move those annotations up to the scheduler. This feels somewhat less efficient, but also somewhat more general.

Notice, before layer unpack we don't know the task keys. We could do this in __dask_distributed_unpack__ instead of __dask_distributed_annotations_unpack__ but I like to keep annotation specializations to the annotation method.

@mrocklin
Copy link
Member

There is a middle ground. We can put annotations on the layer instance. This would be equivalent to what happens when we call

with dask.annotate(priority=...):
    ... create some layers ...

We can just populate the layer instance with an annotations = {"priority": ...} dict. Again, this isn't as efficient (we have to pack and unpack annotations) but it does reduce the number of code paths that we use for this sort of thing. While layers and packing are in flux it might be wise to stick to client-side / blessed API for a while.

@madsbk
Copy link
Contributor Author

madsbk commented Jun 23, 2021

We can just populate the layer instance with an annotations = {"priority": ...} dict. Again, this isn't as efficient (we have to pack and unpack annotations) but it does reduce the number of code paths that we use for this sort of thing. While layers and packing are in flux it might be wise to stick to client-side / blessed API for a while.

We only want to prioritize getitem() tasks, the rest of the shuffle tasks should keep there original priority. Basically, it is the Shuffle.Layer that tells the scheduler that some of it tasks needs priority.

@mrocklin
Copy link
Member

We only want to prioritize getitem() tasks, the rest of the shuffle tasks should keep there original priority. Basically, it is the Shuffle.Layer that tells the scheduler that some of it tasks needs priority.

Agreed. I believe that you can put the information you placed in the unpack method directly on the Layer and that the Client will send that information up.

@madsbk
Copy link
Contributor Author

madsbk commented Jun 23, 2021

Agreed. I believe that you can put the information you placed in the unpack method directly on the Layer and that the Client will send that information up.

I am not sure I follow, do you suggest that we set annotations when creating SimpleShuffleLayer and ShuffleLayer like here: https://github.com/dask/dask/blob/main/dask/dataframe/shuffle.py#L602 ?

In that case we need to use a callable annotation in order to avoid graph materialization, which could also work.

@mrocklin
Copy link
Member

Yes there, or even within the layer's constructor

In that case we need to use a callable annotation in order to avoid graph materialization, which could also work.

My hope is that we could figure out the relevant keys without generating the entire graph.

What is in this PR is fine too. I just think that if we can keep this logic on the client side then it's less likely to need maintenance going forward.

@madsbk
Copy link
Contributor Author

madsbk commented Jun 23, 2021

My hope is that we could figure out the relevant keys without generating the entire graph.

I will have to think about this :)

@madsbk madsbk closed this Jun 23, 2021
@madsbk madsbk reopened this Jun 23, 2021
@mrocklin
Copy link
Member

@rjzamora I'm not sure if @madsbk mentioned this to you or not, but if it was easy to generate a set of keys for shuffle layers without generating the full graph then that would make it easier for us to specify nicer priorities at the layer level to shuffles. I would love to get this PR in, one way or another, by next release.

My hope is that you understand the shuffle system well enough to be able to quickly generate a def keys(self) method. I think that if we add that then we can move the callable here directly onto the Layer instance, rather than push it into the unpack method.

If we can't do that this week then let's punt and find a way to merge this in.

@rjzamora
Copy link
Member

My hope is that you understand the shuffle system well enough to be able to quickly generate a def keys(self) method.

Makes sense - I’ll look through this PR and give you my thoughts tomorrow. There is no doubt that we can generate a set of keys without “materializing” the graph, but the algorithm will have the same complexity as _construct_graph.

@mrocklin
Copy link
Member

Oh interesting. I would have expected it to be fairly cheap to generate just the keys.

@rjzamora
Copy link
Member

Oh interesting. I would have expected it to be fairly cheap to generate just the keys.

I should say may have - I'll need to think about it :).

@mrocklin
Copy link
Member

Also, we only need to generate the keys that need to have a set priority here.

mrocklin pushed a commit that referenced this pull request Jul 1, 2021
@madsbk madsbk deleted the priority_shuffle_split branch August 19, 2021 07:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants