Skip to content

[Data] Make Zip op streaming#58721

Open
owenowenisme wants to merge 17 commits intoray-project:masterfrom
owenowenisme:data/make-zip-op-streaming
Open

[Data] Make Zip op streaming#58721
owenowenisme wants to merge 17 commits intoray-project:masterfrom
owenowenisme:data/make-zip-op-streaming

Conversation

@owenowenisme
Copy link
Copy Markdown
Member

@owenowenisme owenowenisme commented Nov 18, 2025

Description

Making Zip operator streaming based on StreamingRepartition

Related issues

Closes #56300

@owenowenisme owenowenisme requested a review from a team as a code owner November 18, 2025 00:24
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the Zip operator to be a streaming operator instead of a bulk one. This is a significant and positive change for performance and memory usage. The approach taken is to introduce a new logical optimizer rule, AddStreamingRepartitionWhenZip, which ensures that all inputs to a Zip operation are repartitioned to have the same number of rows per block. This simplifies the ZipOperator's logic, as it no longer needs to handle complex block alignment.

The implementation is solid and includes new tests for the optimizer rule. I have a couple of suggestions for improvement: one for performance in the remote zip task, and another for making the repartitioning target configurable.

Comment on lines +176 to +181
merged_blocks = []
for blocks in block_groups:
builder = DelegatingBlockBuilder()
for block in blocks:
builder.add_block(ray.get(block))
merged_blocks.append(builder.build())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation fetches blocks sequentially within the remote task, which can be inefficient. You can improve performance by fetching all blocks in parallel using a single ray.get() call before merging them.

Suggested change
merged_blocks = []
for blocks in block_groups:
builder = DelegatingBlockBuilder()
for block in blocks:
builder.add_block(ray.get(block))
merged_blocks.append(builder.build())
all_block_refs = [block for blocks in block_groups for block in blocks]
all_blocks_resolved = ray.get(all_block_refs)
block_map = dict(zip(all_block_refs, all_blocks_resolved))
merged_blocks = []
for blocks in block_groups:
builder = DelegatingBlockBuilder()
for block_ref in blocks:
builder.add_block(block_map[block_ref])
merged_blocks.append(builder.build())

class AddStreamingRepartitionWhenZip(Rule):
"""Insert StreamingRepartition before each Zip input so blocks align."""

TARGET_NUM_ROWS_PER_BLOCK = 128
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The TARGET_NUM_ROWS_PER_BLOCK is hardcoded to 128. This might not be optimal for all use cases. Consider making this value configurable, for example, through the DataContext. This would provide more flexibility for users to tune performance based on their specific data characteristics (e.g., row size).

@owenowenisme owenowenisme added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Nov 18, 2025
owenowenisme and others added 2 commits December 5, 2025 19:15
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>

# Conflicts:
#	python/ray/data/_internal/logical/optimizers.py

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin force-pushed the data/make-zip-op-streaming branch from 39f042f to 40df837 Compare December 6, 2025 03:41
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme force-pushed the data/make-zip-op-streaming branch from 96b5865 to c1a49e5 Compare January 2, 2026 04:15
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
self._input_buffers[input_index].append(refs)
assert 0 <= input_index < len(self._input_dependencies), input_index
self._pending_bundles[input_index].append(refs)
self._pending_rows[input_index] += refs.num_rows()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeError when bundle has unknown row count

RefBundle.num_rows() returns Optional[int] and can return None when block metadata doesn't have a row count. The code at line 131 performs self._pending_rows[input_index] += refs.num_rows() which raises a TypeError if num_rows() is None. This propagates to line 186 where min(self._pending_rows) fails with mixed int/None values, and line 243 where bundle_rows <= rows_remaining comparison fails. Several data sources like BigQuery, Delta Sharing, and Hudi legitimately set num_rows=None in metadata. The previous bulk implementation handled this by fetching row counts remotely, but the new streaming implementation lacks this safeguard.

Additional Locations (2)

Fix in Cursor Fix in Web

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
for bundle in consumed:
self._metrics.on_input_dequeued(bundle)
total_size_bytes += bundle.size_bytes()
owns_blocks = owns_blocks and bundle.owns_blocks
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output bundle size_bytes is zero when slicing occurs

The total_size_bytes calculation only counts bundles that are fully consumed, but when bundles are sliced (split across multiple tasks), the consumed list is empty until the final slice is used. This means output bundles created from sliced data will have size_bytes=0 in their metadata, while the final task gets the entire original bundle's size. This causes inaccurate memory accounting despite implements_accurate_memory_accounting() returning True. The fix is to calculate size from merged_bundles (which contains the actual processed data) instead of from consumed bundles.

Additional Locations (1)

Fix in Cursor Fix in Web

for bundle in consumed:
self._metrics.on_input_dequeued(bundle)
total_size_bytes += bundle.size_bytes()
owns_blocks = owns_blocks and bundle.owns_blocks
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Output bundle ownership incorrectly based on input bundles

The output bundle's owns_blocks is derived from consumed input bundles rather than recognizing that the remote task creates a new block. When input bundles don't own their blocks (e.g., shared bundles) and are consumed whole without slicing, owns_blocks becomes False for the output. However, the output contains a newly created block from _zip_blocks_with_slices that has no other references, so it should always be owned. This could prevent eager memory cleanup when destroy_if_owned is called, since the output incorrectly believes it doesn't own its block.

Fix in Cursor Fix in Web

@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jan 22, 2026
@owenowenisme owenowenisme added unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it. and removed stale The issue is stale. It will be closed within 7 days unless there are further conversation labels Jan 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] Make Zip a properly streaming operator

2 participants