Skip to content

[Data] Make Zip operator streaming#56504

Closed
owenowenisme wants to merge 15 commits intoray-project:masterfrom
owenowenisme:data/make-zip-operator-streaming
Closed

[Data] Make Zip operator streaming#56504
owenowenisme wants to merge 15 commits intoray-project:masterfrom
owenowenisme:data/make-zip-operator-streaming

Conversation

@owenowenisme
Copy link
Copy Markdown
Member

@owenowenisme owenowenisme commented Sep 13, 2025

Why are these changes needed?

Related issue number

Closes #56300

Checks

Test script to show streaming works:

import time
import ray

ray.init()

def slow_batch(batch):
    # Sleep 1s per row in this batch.
    time.sleep(len(batch))
    return batch

# Two inputs of equal length
n = 100

ds1 = ray.data.range(n)
ds2 = ray.data.range(n).map_batches(slow_batch, batch_size=1)

t0 = time.time()
out = ds1.zip(ds2).take(3)
elapsed = time.time() - t0

print("Output:", out)
print(f"Elapsed: {elapsed:.2f}s")

With original batching zip we get:

Output: [{'id': 0, 'id_1': 0}, {'id': 1, 'id_1': 1}, {'id': 2, 'id_1': 2}]                                                                                      
Elapsed: 8.41s

With streaming zip we get :

Output: [{'id': 0, 'id_1': 0}, {'id': 1, 'id_1': 1}, {'id': 2, 'id_1': 2}]                                                                                      
Elapsed: 3.07s 

Note that now zip operator would need at least N + 1 cpus for ds1.zip(ds2,ds3,...,dsN) in order to streaming.

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Note

Refactors ZipOperator to stream and zip inputs on-the-fly across N inputs with leftover handling, adjusts stats/CPU needs, and updates tests accordingly.

  • Data Execution:
    • ZipOperator:
      • Convert from bulk to streaming: zip outputs produced incrementally during input ingestion.
      • Generalize _zip to N-ary inputs with per-input buffers and _leftover_blocks handling.
      • Skip empty-row blocks; enqueue output as soon as a zipped block is formed.
      • Add check_still_has_blocks, _zip_blocks (chain zipping), and initialize stats as {"Zip": []}.
      • Assert equal total rows across inputs at completion with clearer error.
  • Tests:
    • test_zip.py: simplify split behavior expectations; remove inversion parameter; ensure block count matches first dataset; add multi-dataset zipping coverage.
    • test_dynamic_block_split.py: increase required CPUs to 3 to accommodate streaming zip.

Written by Cursor Bugbot for commit 606bbde. This will update automatically on new commits. Configure here.

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
@owenowenisme owenowenisme force-pushed the data/make-zip-operator-streaming branch from 61f995d to 4fd074c Compare September 13, 2025 09:27
owenowenisme and others added 3 commits September 13, 2025 18:23
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
…rable dataset size

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
richardliaw pushed a commit that referenced this pull request Sep 25, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}                                                                         
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>   
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
#56504 
<!-- For example: "Closes #1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
@owenowenisme owenowenisme force-pushed the data/make-zip-operator-streaming branch from 28f95a2 to c5433fd Compare September 25, 2025 16:15
)


@pytest.mark.parametrize(
Copy link
Copy Markdown
Member Author

@owenowenisme owenowenisme Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this test because we don't do invert when streaming.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove the invert option and retain this test?

@owenowenisme owenowenisme force-pushed the data/make-zip-operator-streaming branch 2 times, most recently from a94c80c to c5433fd Compare September 26, 2025 05:36
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
@owenowenisme owenowenisme marked this pull request as ready for review September 26, 2025 14:51
@owenowenisme owenowenisme requested a review from a team as a code owner September 26, 2025 14:51
cursor[bot]

This comment was marked as outdated.

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Sep 26, 2025
elliot-barn pushed a commit that referenced this pull request Sep 27, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}                                                                         
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>   
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
#56504 
<!-- For example: "Closes #1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
cursor[bot]

This comment was marked as outdated.

# We need at least 2 CPUs to run a actorpool streaming
ray.init(num_cpus=2, object_store_memory=1e9)
# We need at least 3 CPUs to run a zip streaming
ray.init(num_cpus=4, object_store_memory=1e9)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to set to num_cpus=3?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, thanks for pointing this out.

)


@pytest.mark.parametrize(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove the invert option and retain this test?

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
cursor[bot]

This comment was marked as outdated.

@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Sep 30, 2025
Copy link
Copy Markdown
Contributor

@srinathk10 srinathk10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@alexeykudinkin
Copy link
Copy Markdown
Contributor

@owenowenisme i think we actually can do more radical simplification of the impl here:

  1. Our goal is to make sure that the blocks produced by the zip a) have the same # of rows and b) hstack the individual blocks from all sequences one by one
  2. To avoid complicated logic of resizing we can actually leverage StreamingRepartition w/ preset target # of rows
  3. Then the only thing zip needs to do is to hstack the blocks

I've discussed this with @srinathk10 and he's gonna help you experiment on this and see it through

@owenowenisme
Copy link
Copy Markdown
Member Author

Update: This PR depends on #57165

dstrodtman pushed a commit to dstrodtman/ray that referenced this pull request Oct 6, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
ray-project#56504
<!-- For example: "Closes ray-project#1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
justinyeh1995 pushed a commit to justinyeh1995/ray that referenced this pull request Oct 20, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}                                                                         
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>   
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
ray-project#56504 
<!-- For example: "Closes ray-project#1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Oct 20, 2025
@owenowenisme owenowenisme added unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it. and removed stale The issue is stale. It will be closed within 7 days unless there are further conversation labels Oct 20, 2025
@owenowenisme
Copy link
Copy Markdown
Member Author

Opening a new one.

landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}                                                                         
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>   
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
ray-project#56504 
<!-- For example: "Closes ray-project#1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.

Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number
ray-project#56504
<!-- For example: "Closes ray-project#1234" -->

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] Make Zip a properly streaming operator

4 participants