[Data] Make StatefulShuffleAggregation.finalize interface allow incremental streaming#59972
[Data] Make StatefulShuffleAggregation.finalize interface allow incremental streaming#59972alexeykudinkin merged 1 commit intomasterfrom
StatefulShuffleAggregation.finalize interface allow incremental streaming#59972Conversation
…l streaming Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request refactors the StatefulShuffleAggregation.finalize interface to support incremental streaming by changing its return type to Iterator[Block]. The implementations in ReducingShuffleAggregation, Concat, and JoiningShuffleAggregation are updated to yield blocks. The logic in HashShuffleAggregator is also refactored to consume this new iterator, improving how execution stats are handled for multiple output blocks. The changes are well-aligned with the goal of preparing for incremental streaming. My review includes a suggestion to remove an unused attribute in JoiningShuffleAggregation.
| partition_id: ArrowBlockBuilder() for partition_id in target_partition_ids | ||
| } | ||
| self.data_context = data_context | ||
| self._data_context = data_context |
There was a problem hiding this comment.
The _data_context attribute is assigned but never used within the JoiningShuffleAggregation class. It seems to be dead code. Consider removing this attribute and the corresponding data_context parameter from the __init__ method signature to improve code clarity. This would also require removing the data_context argument from the call site in JoinOperator.
…remental streaming (#59972) ## Description This change makes `StatefulShuffleAggregation.finalize` interface allow incremental streaming to prepare it for future shift to streaming results incrementally. ## Related issues > Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…remental streaming (ray-project#59972) ## Description This change makes `StatefulShuffleAggregation.finalize` interface allow incremental streaming to prepare it for future shift to streaming results incrementally. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
…remental streaming (ray-project#59972) ## Description This change makes `StatefulShuffleAggregation.finalize` interface allow incremental streaming to prepare it for future shift to streaming results incrementally. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…remental streaming (ray-project#59972) ## Description This change makes `StatefulShuffleAggregation.finalize` interface allow incremental streaming to prepare it for future shift to streaming results incrementally. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…remental streaming (ray-project#59972) ## Description This change makes `StatefulShuffleAggregation.finalize` interface allow incremental streaming to prepare it for future shift to streaming results incrementally. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…remental streaming (ray-project#59972) ## Description This change makes `StatefulShuffleAggregation.finalize` interface allow incremental streaming to prepare it for future shift to streaming results incrementally. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
This change makes
StatefulShuffleAggregation.finalizeinterface allow incremental streaming to prepare it for future shift to streaming results incrementally.Related issues
Additional information