[Data] Make Zip op streaming#58721
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the Zip operator to be a streaming operator instead of a bulk one. This is a significant and positive change for performance and memory usage. The approach taken is to introduce a new logical optimizer rule, AddStreamingRepartitionWhenZip, which ensures that all inputs to a Zip operation are repartitioned to have the same number of rows per block. This simplifies the ZipOperator's logic, as it no longer needs to handle complex block alignment.
The implementation is solid and includes new tests for the optimizer rule. I have a couple of suggestions for improvement: one for performance in the remote zip task, and another for making the repartitioning target configurable.
| merged_blocks = [] | ||
| for blocks in block_groups: | ||
| builder = DelegatingBlockBuilder() | ||
| for block in blocks: | ||
| builder.add_block(ray.get(block)) | ||
| merged_blocks.append(builder.build()) |
There was a problem hiding this comment.
The current implementation fetches blocks sequentially within the remote task, which can be inefficient. You can improve performance by fetching all blocks in parallel using a single ray.get() call before merging them.
| merged_blocks = [] | |
| for blocks in block_groups: | |
| builder = DelegatingBlockBuilder() | |
| for block in blocks: | |
| builder.add_block(ray.get(block)) | |
| merged_blocks.append(builder.build()) | |
| all_block_refs = [block for blocks in block_groups for block in blocks] | |
| all_blocks_resolved = ray.get(all_block_refs) | |
| block_map = dict(zip(all_block_refs, all_blocks_resolved)) | |
| merged_blocks = [] | |
| for blocks in block_groups: | |
| builder = DelegatingBlockBuilder() | |
| for block_ref in blocks: | |
| builder.add_block(block_map[block_ref]) | |
| merged_blocks.append(builder.build()) |
| class AddStreamingRepartitionWhenZip(Rule): | ||
| """Insert StreamingRepartition before each Zip input so blocks align.""" | ||
|
|
||
| TARGET_NUM_ROWS_PER_BLOCK = 128 |
There was a problem hiding this comment.
The TARGET_NUM_ROWS_PER_BLOCK is hardcoded to 128. This might not be optimal for all use cases. Consider making this value configurable, for example, through the DataContext. This would provide more flexibility for users to tune performance based on their specific data characteristics (e.g., row size).
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
39f042f to
40df837
Compare
python/ray/data/_internal/logical/rules/add_streaming_repartition_when_zip.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/logical/rules/add_streaming_repartition_when_zip.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/logical/rules/add_streaming_repartition_when_zip.py
Outdated
Show resolved
Hide resolved
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
96b5865 to
c1a49e5
Compare
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
| self._input_buffers[input_index].append(refs) | ||
| assert 0 <= input_index < len(self._input_dependencies), input_index | ||
| self._pending_bundles[input_index].append(refs) | ||
| self._pending_rows[input_index] += refs.num_rows() |
There was a problem hiding this comment.
TypeError when bundle has unknown row count
RefBundle.num_rows() returns Optional[int] and can return None when block metadata doesn't have a row count. The code at line 131 performs self._pending_rows[input_index] += refs.num_rows() which raises a TypeError if num_rows() is None. This propagates to line 186 where min(self._pending_rows) fails with mixed int/None values, and line 243 where bundle_rows <= rows_remaining comparison fails. Several data sources like BigQuery, Delta Sharing, and Hudi legitimately set num_rows=None in metadata. The previous bulk implementation handled this by fetching row counts remotely, but the new streaming implementation lacks this safeguard.
Additional Locations (2)
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
| for bundle in consumed: | ||
| self._metrics.on_input_dequeued(bundle) | ||
| total_size_bytes += bundle.size_bytes() | ||
| owns_blocks = owns_blocks and bundle.owns_blocks |
There was a problem hiding this comment.
Output bundle size_bytes is zero when slicing occurs
The total_size_bytes calculation only counts bundles that are fully consumed, but when bundles are sliced (split across multiple tasks), the consumed list is empty until the final slice is used. This means output bundles created from sliced data will have size_bytes=0 in their metadata, while the final task gets the entire original bundle's size. This causes inaccurate memory accounting despite implements_accurate_memory_accounting() returning True. The fix is to calculate size from merged_bundles (which contains the actual processed data) instead of from consumed bundles.
Additional Locations (1)
| for bundle in consumed: | ||
| self._metrics.on_input_dequeued(bundle) | ||
| total_size_bytes += bundle.size_bytes() | ||
| owns_blocks = owns_blocks and bundle.owns_blocks |
There was a problem hiding this comment.
Output bundle ownership incorrectly based on input bundles
The output bundle's owns_blocks is derived from consumed input bundles rather than recognizing that the remote task creates a new block. When input bundles don't own their blocks (e.g., shared bundles) and are consumed whole without slicing, owns_blocks becomes False for the output. However, the output contains a newly created block from _zip_blocks_with_slices that has no other references, so it should always be owned. This could prevent eager memory cleanup when destroy_if_owned is called, since the output incorrectly believes it doesn't own its block.
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
Description
Making Zip operator streaming based on StreamingRepartition
Related issues
Closes #56300