[data] Add more shuffle fusion rules#59985
[data] Add more shuffle fusion rules#59985alexeykudinkin merged 9 commits intoray-project:masterfrom
Conversation
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces new fusion rules to optimize redundant shuffles, specifically for Repartition -> Aggregate, StreamingRepartition -> Repartition, and Sort -> Sort operator pairs. The changes involve renaming CombineRepartitions to CombineShuffles and adding the logic for the new fusion rules.
My review found a critical issue in the implementation of the new fusion rules. The use of cp.copy(op) does not correctly modify the operator graph to remove the redundant shuffle. I've provided a detailed comment with a suggested fix to correctly implement the operator fusion by creating new operators with updated input dependencies. Addressing this is crucial for the feature to work as intended.
python/ray/data/_internal/logical/rules/combine_repartitions.py
Outdated
Show resolved
Hide resolved
| elif isinstance(input_op, Repartition) and isinstance(op, Aggregate): | ||
| return Aggregate( | ||
| input_op=input_op.input_dependencies[0], | ||
| key=op._key, | ||
| aggs=op._aggs, | ||
| num_partitions=op._num_partitions, | ||
| batch_format=op._batch_format, | ||
| ) | ||
| elif isinstance(input_op, StreamingRepartition) and isinstance( | ||
| op, Repartition | ||
| ): | ||
| return Repartition( | ||
| input_op.input_dependencies[0], | ||
| num_outputs=op._num_outputs, | ||
| shuffle=op._shuffle, | ||
| keys=op._keys, | ||
| sort=op._sort, | ||
| ) | ||
| elif isinstance(input_op, Sort) and isinstance(op, Sort): | ||
| return Sort( | ||
| input_op.input_dependencies[0], | ||
| sort_key=op._sort_key, | ||
| batch_format=op._batch_format, | ||
| ) | ||
|
|
||
| return op |
There was a problem hiding this comment.
These were the 2 new rules that got added. I just renamed the file from combine_repartitions -> combine_shuffles
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
| target_num_rows_per_block=op.target_num_rows_per_block, | ||
| ) | ||
| elif isinstance(input_op, Repartition) and isinstance(op, Aggregate): | ||
| return Aggregate( |
There was a problem hiding this comment.
We can just return op, right?
There was a problem hiding this comment.
Actually, no -- we'd inherit num_partitions from Repartition
There was a problem hiding this comment.
We can just return op, right?
This would contain the original input_dependency (we want to get rid of Repartition)
| if isinstance(input_op, Repartition) and isinstance(op, Repartition): | ||
| shuffle = input_op._shuffle or op._shuffle | ||
| return Repartition( | ||
| input_op.input_dependencies[0], | ||
| num_outputs=op._num_outputs, | ||
| shuffle=shuffle, | ||
| keys=op._keys, | ||
| sort=op._sort, | ||
| ) | ||
| elif isinstance(input_op, StreamingRepartition) and isinstance( | ||
| op, StreamingRepartition | ||
| ): | ||
| return StreamingRepartition( | ||
| input_op.input_dependencies[0], | ||
| target_num_rows_per_block=op.target_num_rows_per_block, | ||
| ) |
There was a problem hiding this comment.
We'd really unify Repartition and Streaming logical ops
There was a problem hiding this comment.
Hmm yes I somewhat agree, although I don't know the original intentions behind splitting them up (different physical operators?) Anyways, i'd prefer to do that refactor later since it's unrelated to this PR
| elif isinstance(input_op, StreamingRepartition) and isinstance( | ||
| op, Repartition | ||
| ): |
There was a problem hiding this comment.
We'd also fuse the other way around
| assert "Repartition[Repartition]" in logical_plan | ||
| assert "Aggregate[Aggregate]" in logical_plan | ||
| # Check that in the Logical Plan (Optimized), Repartition is removed (combined) | ||
| optimized_logical = captured.split("-------- Logical Plan (Optimized) --------")[ | ||
| 1 | ||
| ].split("-------- Physical Plan --------")[0] | ||
| assert "Repartition[Repartition]" not in optimized_logical | ||
| assert "Aggregate[Aggregate]" in optimized_logical |
There was a problem hiding this comment.
This kind of asserts are not robust:
- We want to assert not only presence but also relationship b/w ops
- Ideally assertions should look like
logical == Aggregate(Repartition(Any), ...)but it'd require quite a bit of work on our operators to make it possible - For now just assert a sub-plan (textual) is present in the dag
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…/more-shuffle-fusion-rules
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
| elif isinstance(input_op, Repartition) and isinstance(op, StreamingRepartition): | ||
| return StreamingRepartition( | ||
| input_op.input_dependencies[0], | ||
| target_num_rows_per_block=op._target_num_rows_per_block, | ||
| ) |
There was a problem hiding this comment.
Now that i'm thinking more about it, i think we can't do that as we'd be getting substantially different output in that case
- Repartition > SR, will shuffle by key and so will get data nicely clustered, so we can't really replace it by SR
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description Add redundant shuffle fusion rules by dropping the 1st shuffle - Repartition -> Aggregate - StreamingRepartition -> Repartition - Repartition -> StreamingRepartition - Sort -> Sort ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: jeffery4011 <jefferyshen1015@gmail.com>
## Description Add redundant shuffle fusion rules by dropping the 1st shuffle - Repartition -> Aggregate - StreamingRepartition -> Repartition - Repartition -> StreamingRepartition - Sort -> Sort ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description Add redundant shuffle fusion rules by dropping the 1st shuffle - Repartition -> Aggregate - StreamingRepartition -> Repartition - Repartition -> StreamingRepartition - Sort -> Sort ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description Add redundant shuffle fusion rules by dropping the 1st shuffle - Repartition -> Aggregate - StreamingRepartition -> Repartition - Repartition -> StreamingRepartition - Sort -> Sort ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Add redundant shuffle fusion rules by dropping the 1st shuffle
Related issues
None
Additional information
None