Skip to content

Add ShuffleStage HLG Layer#6650

Merged
jrbourbeau merged 121 commits intodask:masterfrom
rjzamora:shuffle-hlg
Oct 16, 2020
Merged

Add ShuffleStage HLG Layer#6650
jrbourbeau merged 121 commits intodask:masterfrom
rjzamora:shuffle-hlg

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Sep 17, 2020

Depends on #6510

Implements ShuffleStage - A simple HighLevelGraph (HLG) Layer for a single stage of a task-based shuffle in dask.dataframe. To enable culling (without the need to materialize the full graph), the shuffling algorithm was revised to only include tasks that are needed to produce a specific set of output keys.

TODO:

  • Validate behavior and add tests
  • Benchmark the new approach

cc @madsbk @quasiben @mrocklin

Co-authored-by: Mads R. B. Kristensen <madsbk@gmail.com>
@jakirkham
Copy link
Member

@TomAugspurger, do you have any thoughts on this? 🙂

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. I had a bunch of comments queued. My apologies for the delay.

The optimization pass is now significantly faster...

Note that we're caching things, so the timeit call here may not be representative

@rjzamora
Copy link
Member Author

Thanks for the review @mrocklin ! Hopefully I addressed your comments.

Note that we're caching things, so the timeit call here may not be representative

Good point - I updated the "benchmark" results above. We still get a nice performance bump (albeit a slightly smaller one).

@rjzamora
Copy link
Member Author

rjzamora commented Oct 1, 2020

Update: I realized this morning that this PR was leaving out the common case of the "simple" shuffle (where the number of output partitions is less than max_branch). I revised the class structure to include both a SimpleShuffleLayer and a shuffleLayer (which inherits from SimpleShuffleLayer).

all input partitions. This method does not require graph
materialization.
"""
deps = defaultdict(set)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madsbk - I just wanted to get your thoughts on something here...

It seems that the HLG cull operation breaks when this deps dictionary is a vanilla dict (rather than a defaultdict(set)). The problem is that we are not actually returning any intra-layer dependencies when the shuffle layer is culled (because we are not actually materializing the graph). This means, we will get a keyerror during this culled_deps[k] access in HighLevelGraph.cull (because we are not adding dependencies for all keys).

Does this seem like it could be a problem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed in #6699, in which cull() is only required to return external dependencies.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @madsbk !

@mrocklin
Copy link
Member

mrocklin commented Oct 9, 2020

@jrbourbeau I'd like to put this on your queue as well

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work here @rjzamora!

Please correct me if I'm wrong, but from what I can tell this PR moves existing DataFrame shuffling code into two new SimpleShuffleLayer and ShuffleLayer layer classes. The corresponding low-level task graphs aren't materialized until we try to inspect the underlying task graph (e.g. __getitem__) which will eventually help us send a smaller object to the scheduler (once we're directly sending HighLevelGraphs).

One piece of follow-up work that comes to mind is adding a custom serialization method for these new shuffle layers. The implementation over in #6693 would cause us to fully materialize the task graph for shuffle layers before sending them to the scheduler. Though that's certainly future work we don't need to worry about for this PR : )

Overall I think the changes here look good. The fact that tests are passing gives me confidence. I've left a few small comments, but otherwise this appears to be good to go. Would you recommend we merge this in?

Comment on lines +1030 to +1031
if name.startswith("shuffle-"):
assert isinstance(layer, dd.shuffle.ShuffleLayer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to switch these to instead check that if a layer is a ShuffleLayer, then its name begins with "shuffle-"

Suggested change
if name.startswith("shuffle-"):
assert isinstance(layer, dd.shuffle.ShuffleLayer)
if isinstance(layer, dd.shuffle.ShuffleLayer):
assert name.startswith("shuffle-")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this check to make sure we are actually using ShuffleLayer layers. So, I worry that if we aren't adding them, then this change will miss it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha! I has the concern that if we change our ShuffleLayer naming scheme then we would start to miss this check. I just pushed a small commit which asserts that there are ShuffleLayers in the HLG and that the names of the ShuffleLayers are as expected (i.e. they start with "shuffle-"). That should, I think, handle both of our concerns

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora! Will merge once CI passes

@rjzamora
Copy link
Member Author

Thanks for your help @jrbourbeau !

Sounds good. My only thought: I'm not sure I understand the purpose of f5dfd60, since we will no longer catch the case that the ShuffleLayer layers are missing.

@jrbourbeau
Copy link
Member

The

    assert any(
        isinstance(layer, dd.shuffle.ShuffleLayer) for layer in dsk.layers.values()
    )

check should ensure that ShuffleLayer layers aren't missing

@rjzamora
Copy link
Member Author

rjzamora commented Oct 16, 2020

Ah - sorry! I was looking at the wrong change and didn't see that. In that case, I don't think the assert name.startswith("shuffle-") check is really necessary (since we "could" use a different name), but it doesn't hurt... So, I think we are good. Thanks again!

@jrbourbeau jrbourbeau merged commit fbe5174 into dask:master Oct 16, 2020
@rjzamora rjzamora deleted the shuffle-hlg branch October 17, 2020 00:23
kumarprabhu1988 pushed a commit to kumarprabhu1988/dask that referenced this pull request Oct 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants