[Data] Make Zip operator streaming#56504
[Data] Make Zip operator streaming#56504owenowenisme wants to merge 15 commits intoray-project:masterfrom
Conversation
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
61f995d to
4fd074c
Compare
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
…rable dataset size Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
…rator-accept-multiple-input
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
#56504
<!-- For example: "Closes #1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
28f95a2 to
c5433fd
Compare
| ) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( |
There was a problem hiding this comment.
Removed this test because we don't do invert when streaming.
There was a problem hiding this comment.
Could we remove the invert option and retain this test?
a94c80c to
c5433fd
Compare
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
#56504
<!-- For example: "Closes #1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
| # We need at least 2 CPUs to run a actorpool streaming | ||
| ray.init(num_cpus=2, object_store_memory=1e9) | ||
| # We need at least 3 CPUs to run a zip streaming | ||
| ray.init(num_cpus=4, object_store_memory=1e9) |
There was a problem hiding this comment.
Did you mean to set to num_cpus=3?
There was a problem hiding this comment.
Yeah, thanks for pointing this out.
| ) | ||
|
|
||
|
|
||
| @pytest.mark.parametrize( |
There was a problem hiding this comment.
Could we remove the invert option and retain this test?
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
|
@owenowenisme i think we actually can do more radical simplification of the impl here:
I've discussed this with @srinathk10 and he's gonna help you experiment on this and see it through |
|
Update: This PR depends on #57165 |
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
ray-project#56504
<!-- For example: "Closes ray-project#1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
ray-project#56504
<!-- For example: "Closes ray-project#1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
|
Opening a new one. |
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
ray-project#56504
<!-- For example: "Closes ray-project#1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
## Why are these changes needed?
Before making zip operator a streaming operator, we make it accept
multiple input first.
Now Zip operator can be used with
```py
>>> import ray
>>> ds1 = ray.data.range(5)
>>> ds2 = ray.data.range(5)
>>> ds3 = ray.data.range(5)
>>> ds1.zip(ds2, ds3).take_batch()
{'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}
>>> ds1.zip(ds2, ds3).take_all()
[{'id': 0, 'id_1': 0, 'id_2': 0}, {'id': 1, 'id_1': 1, 'id_2': 1}, {'id': 2, 'id_1': 2, 'id_2': 2}, {'id': 3, 'id_1': 3, 'id_2': 3}, {'id': 4, 'id_1': 4, 'id_2': 4}]
>>>
```
<!-- Please give a short summary of the change and the problem this
solves. -->
## Related issue number
ray-project#56504
<!-- For example: "Closes ray-project#1234" -->
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Why are these changes needed?
Related issue number
Closes #56300
Checks
Test script to show streaming works:
With original batching zip we get:
With streaming zip we get :
Note that now zip operator would need at least N + 1 cpus for ds1.zip(ds2,ds3,...,dsN) in order to streaming.
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.Note
Refactors ZipOperator to stream and zip inputs on-the-fly across N inputs with leftover handling, adjusts stats/CPU needs, and updates tests accordingly.
_zipto N-ary inputs with per-input buffers and_leftover_blockshandling.check_still_has_blocks,_zip_blocks(chain zipping), and initialize stats as{"Zip": []}.test_zip.py: simplify split behavior expectations; remove inversion parameter; ensure block count matches first dataset; add multi-dataset zipping coverage.test_dynamic_block_split.py: increase required CPUs to 3 to accommodate streaming zip.Written by Cursor Bugbot for commit 606bbde. This will update automatically on new commits. Configure here.