Conversation
Co-authored-by: Mads R. B. Kristensen <madsbk@gmail.com>
|
@TomAugspurger, do you have any thoughts on this? 🙂 |
mrocklin
left a comment
There was a problem hiding this comment.
Oops. I had a bunch of comments queued. My apologies for the delay.
The optimization pass is now significantly faster...
Note that we're caching things, so the timeit call here may not be representative
|
Update: I realized this morning that this PR was leaving out the common case of the "simple" shuffle (where the number of output partitions is less than |
| all input partitions. This method does not require graph | ||
| materialization. | ||
| """ | ||
| deps = defaultdict(set) |
There was a problem hiding this comment.
@madsbk - I just wanted to get your thoughts on something here...
It seems that the HLG cull operation breaks when this deps dictionary is a vanilla dict (rather than a defaultdict(set)). The problem is that we are not actually returning any intra-layer dependencies when the shuffle layer is culled (because we are not actually materializing the graph). This means, we will get a keyerror during this culled_deps[k] access in HighLevelGraph.cull (because we are not adding dependencies for all keys).
Does this seem like it could be a problem?
There was a problem hiding this comment.
This should be fixed in #6699, in which cull() is only required to return external dependencies.
|
@jrbourbeau I'd like to put this on your queue as well |
There was a problem hiding this comment.
Thanks for all your work here @rjzamora!
Please correct me if I'm wrong, but from what I can tell this PR moves existing DataFrame shuffling code into two new SimpleShuffleLayer and ShuffleLayer layer classes. The corresponding low-level task graphs aren't materialized until we try to inspect the underlying task graph (e.g. __getitem__) which will eventually help us send a smaller object to the scheduler (once we're directly sending HighLevelGraphs).
One piece of follow-up work that comes to mind is adding a custom serialization method for these new shuffle layers. The implementation over in #6693 would cause us to fully materialize the task graph for shuffle layers before sending them to the scheduler. Though that's certainly future work we don't need to worry about for this PR : )
Overall I think the changes here look good. The fact that tests are passing gives me confidence. I've left a few small comments, but otherwise this appears to be good to go. Would you recommend we merge this in?
dask/dataframe/tests/test_shuffle.py
Outdated
| if name.startswith("shuffle-"): | ||
| assert isinstance(layer, dd.shuffle.ShuffleLayer) |
There was a problem hiding this comment.
I think we want to switch these to instead check that if a layer is a ShuffleLayer, then its name begins with "shuffle-"
| if name.startswith("shuffle-"): | |
| assert isinstance(layer, dd.shuffle.ShuffleLayer) | |
| if isinstance(layer, dd.shuffle.ShuffleLayer): | |
| assert name.startswith("shuffle-") |
There was a problem hiding this comment.
I added this check to make sure we are actually using ShuffleLayer layers. So, I worry that if we aren't adding them, then this change will miss it?
There was a problem hiding this comment.
Gotcha! I has the concern that if we change our ShuffleLayer naming scheme then we would start to miss this check. I just pushed a small commit which asserts that there are ShuffleLayers in the HLG and that the names of the ShuffleLayers are as expected (i.e. they start with "shuffle-"). That should, I think, handle both of our concerns
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks @rjzamora! Will merge once CI passes
|
Thanks for your help @jrbourbeau ! Sounds good. My only thought: I'm not sure I understand the purpose of |
|
The assert any(
isinstance(layer, dd.shuffle.ShuffleLayer) for layer in dsk.layers.values()
)check should ensure that |
|
Ah - sorry! I was looking at the wrong change and didn't see that. In that case, I don't think the |
Depends on #6510
Implements
ShuffleStage- A simpleHighLevelGraph(HLG)Layerfor a single stage of a task-based shuffle indask.dataframe. To enable culling (without the need to materialize the full graph), the shuffling algorithm was revised to only include tasks that are needed to produce a specific set of output keys.TODO:
cc @madsbk @quasiben @mrocklin