Skip to content

[Data] Make StatefulShuffleAggregation.finalize interface allow incremental streaming#59972

Merged
alexeykudinkin merged 1 commit intomasterfrom
ak/hsh-shfl-strm
Jan 8, 2026
Merged

[Data] Make StatefulShuffleAggregation.finalize interface allow incremental streaming#59972
alexeykudinkin merged 1 commit intomasterfrom
ak/hsh-shfl-strm

Conversation

@alexeykudinkin
Copy link
Copy Markdown
Contributor

Description

This change makes StatefulShuffleAggregation.finalize interface allow incremental streaming to prepare it for future shift to streaming results incrementally.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

…l streaming

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin requested a review from a team as a code owner January 8, 2026 18:46
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Jan 8, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the StatefulShuffleAggregation.finalize interface to support incremental streaming by changing its return type to Iterator[Block]. The implementations in ReducingShuffleAggregation, Concat, and JoiningShuffleAggregation are updated to yield blocks. The logic in HashShuffleAggregator is also refactored to consume this new iterator, improving how execution stats are handled for multiple output blocks. The changes are well-aligned with the goal of preparing for incremental streaming. My review includes a suggestion to remove an unused attribute in JoiningShuffleAggregation.

partition_id: ArrowBlockBuilder() for partition_id in target_partition_ids
}
self.data_context = data_context
self._data_context = data_context
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _data_context attribute is assigned but never used within the JoiningShuffleAggregation class. It seems to be dead code. Consider removing this attribute and the corresponding data_context parameter from the __init__ method signature to improve code clarity. This would also require removing the data_context argument from the call site in JoinOperator.

Copy link
Copy Markdown
Contributor

@srinathk10 srinathk10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@ray-gardener ray-gardener bot added the data Ray Data-related issues label Jan 8, 2026
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) January 8, 2026 19:06
@alexeykudinkin alexeykudinkin merged commit 757b55d into master Jan 8, 2026
6 of 7 checks passed
@alexeykudinkin alexeykudinkin deleted the ak/hsh-shfl-strm branch January 8, 2026 19:44
elliot-barn pushed a commit that referenced this pull request Jan 11, 2026
…remental streaming (#59972)

## Description

This change makes `StatefulShuffleAggregation.finalize` interface allow
incremental streaming to prepare it for future shift to streaming
results incrementally.

## Related issues
> Link related issues: "Fixes #1234", "Closes #1234", or "Related to
#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
…remental streaming (ray-project#59972)

## Description

This change makes `StatefulShuffleAggregation.finalize` interface allow
incremental streaming to prepare it for future shift to streaming
results incrementally.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
lee1258561 pushed a commit to pinterest/ray that referenced this pull request Feb 3, 2026
…remental streaming (ray-project#59972)

## Description

This change makes `StatefulShuffleAggregation.finalize` interface allow
incremental streaming to prepare it for future shift to streaming
results incrementally.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
…remental streaming (ray-project#59972)

## Description

This change makes `StatefulShuffleAggregation.finalize` interface allow
incremental streaming to prepare it for future shift to streaming
results incrementally.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…remental streaming (ray-project#59972)

## Description

This change makes `StatefulShuffleAggregation.finalize` interface allow
incremental streaming to prepare it for future shift to streaming
results incrementally.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…remental streaming (ray-project#59972)

## Description

This change makes `StatefulShuffleAggregation.finalize` interface allow
incremental streaming to prepare it for future shift to streaming
results incrementally.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants