Skip to content

[data] Add more shuffle fusion rules#59985

Merged
alexeykudinkin merged 9 commits intoray-project:masterfrom
iamjustinhsu:jhsu/more-shuffle-fusion-rules
Jan 15, 2026
Merged

[data] Add more shuffle fusion rules#59985
alexeykudinkin merged 9 commits intoray-project:masterfrom
iamjustinhsu:jhsu/more-shuffle-fusion-rules

Conversation

@iamjustinhsu
Copy link
Copy Markdown
Contributor

@iamjustinhsu iamjustinhsu commented Jan 9, 2026

Description

Add redundant shuffle fusion rules by dropping the 1st shuffle

  • Repartition -> Aggregate
  • StreamingRepartition -> Repartition
  • Repartition -> StreamingRepartition
  • Sort -> Sort

Related issues

None

Additional information

None

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces new fusion rules to optimize redundant shuffles, specifically for Repartition -> Aggregate, StreamingRepartition -> Repartition, and Sort -> Sort operator pairs. The changes involve renaming CombineRepartitions to CombineShuffles and adding the logic for the new fusion rules.

My review found a critical issue in the implementation of the new fusion rules. The use of cp.copy(op) does not correctly modify the operator graph to remove the redundant shuffle. I've provided a detailed comment with a suggested fix to correctly implement the operator fusion by creating new operators with updated input dependencies. Addressing this is crucial for the feature to work as intended.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Comment on lines +46 to +71
elif isinstance(input_op, Repartition) and isinstance(op, Aggregate):
return Aggregate(
input_op=input_op.input_dependencies[0],
key=op._key,
aggs=op._aggs,
num_partitions=op._num_partitions,
batch_format=op._batch_format,
)
elif isinstance(input_op, StreamingRepartition) and isinstance(
op, Repartition
):
return Repartition(
input_op.input_dependencies[0],
num_outputs=op._num_outputs,
shuffle=op._shuffle,
keys=op._keys,
sort=op._sort,
)
elif isinstance(input_op, Sort) and isinstance(op, Sort):
return Sort(
input_op.input_dependencies[0],
sort_key=op._sort_key,
batch_format=op._batch_format,
)

return op
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were the 2 new rules that got added. I just renamed the file from combine_repartitions -> combine_shuffles

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu marked this pull request as ready for review January 9, 2026 00:28
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner January 9, 2026 00:28
@ray-gardener ray-gardener bot added performance data Ray Data-related issues labels Jan 9, 2026
target_num_rows_per_block=op.target_num_rows_per_block,
)
elif isinstance(input_op, Repartition) and isinstance(op, Aggregate):
return Aggregate(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just return op, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, no -- we'd inherit num_partitions from Repartition

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just return op, right?

This would contain the original input_dependency (we want to get rid of Repartition)

Comment on lines +30 to +45
if isinstance(input_op, Repartition) and isinstance(op, Repartition):
shuffle = input_op._shuffle or op._shuffle
return Repartition(
input_op.input_dependencies[0],
num_outputs=op._num_outputs,
shuffle=shuffle,
keys=op._keys,
sort=op._sort,
)
elif isinstance(input_op, StreamingRepartition) and isinstance(
op, StreamingRepartition
):
return StreamingRepartition(
input_op.input_dependencies[0],
target_num_rows_per_block=op.target_num_rows_per_block,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd really unify Repartition and Streaming logical ops

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yes I somewhat agree, although I don't know the original intentions behind splitting them up (different physical operators?) Anyways, i'd prefer to do that refactor later since it's unrelated to this PR

Comment on lines +54 to +56
elif isinstance(input_op, StreamingRepartition) and isinstance(
op, Repartition
):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd also fuse the other way around

Comment on lines +858 to +865
assert "Repartition[Repartition]" in logical_plan
assert "Aggregate[Aggregate]" in logical_plan
# Check that in the Logical Plan (Optimized), Repartition is removed (combined)
optimized_logical = captured.split("-------- Logical Plan (Optimized) --------")[
1
].split("-------- Physical Plan --------")[0]
assert "Repartition[Repartition]" not in optimized_logical
assert "Aggregate[Aggregate]" in optimized_logical
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of asserts are not robust:

  • We want to assert not only presence but also relationship b/w ops
  • Ideally assertions should look like logical == Aggregate(Repartition(Any), ...) but it'd require quite a bit of work on our operators to make it possible
  • For now just assert a sub-plan (textual) is present in the dag

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Comment on lines +75 to +79
elif isinstance(input_op, Repartition) and isinstance(op, StreamingRepartition):
return StreamingRepartition(
input_op.input_dependencies[0],
target_num_rows_per_block=op._target_num_rows_per_block,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that i'm thinking more about it, i think we can't do that as we'd be getting substantially different output in that case

  • Repartition > SR, will shuffle by key and so will get data nicely clustered, so we can't really replace it by SR

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label Jan 15, 2026
@alexeykudinkin alexeykudinkin merged commit bfa6f0b into ray-project:master Jan 15, 2026
7 checks passed
jeffery4011 pushed a commit to jeffery4011/ray that referenced this pull request Jan 20, 2026
## Description
Add redundant shuffle fusion rules by dropping the 1st shuffle
- Repartition -> Aggregate
- StreamingRepartition -> Repartition
- Repartition -> StreamingRepartition
- Sort -> Sort

## Related issues
None

## Additional information
None

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: jeffery4011 <jefferyshen1015@gmail.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
## Description
Add redundant shuffle fusion rules by dropping the 1st shuffle
- Repartition -> Aggregate
- StreamingRepartition -> Repartition
- Repartition -> StreamingRepartition
- Sort -> Sort

## Related issues
None

## Additional information
None

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Add redundant shuffle fusion rules by dropping the 1st shuffle
- Repartition -> Aggregate
- StreamingRepartition -> Repartition
- Repartition -> StreamingRepartition
- Sort -> Sort

## Related issues
None

## Additional information
None

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Add redundant shuffle fusion rules by dropping the 1st shuffle
- Repartition -> Aggregate
- StreamingRepartition -> Repartition
- Repartition -> StreamingRepartition
- Sort -> Sort

## Related issues
None

## Additional information
None

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests performance

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants