[Data] Use iterator in write ops instead of accumulating all of the blocks in memory instead#57108
Conversation
…n mem instead Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request aims to optimize memory usage in write_datasink by using an iterator. While the change correctly introduces an iterator, it doesn't fully achieve the memory-saving goal because it still accumulates all write results in memory before processing. I've provided a review comment with a suggested code change to process the results incrementally, which should properly address the underlying memory issue.
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the write operations to use an iterator-based approach for handling write results, which is a great improvement for memory efficiency. Instead of accumulating all block references and getting their results at once with ray.get(), the new implementation processes them incrementally. This avoids holding all results in memory, which is particularly beneficial for large datasets.
The changes include:
- Modifying
Dataset.write_datasinkto use_execute_to_iteratorand process write results in a streaming fashion. - Introducing a
WriteResult.combineclass method to aggregate per-bundle write results. - Relocating the
gen_datasink_write_resulthelper function to a more appropriate module.
The implementation is solid. I've suggested a minor enhancement to use a generator expression, which further improves memory usage by avoiding an intermediate list of results. Overall, this is a valuable optimization.
| iter_, stats = self._write_ds._execute_to_iterator() | ||
| write_results = [] | ||
|
|
||
| for bundle in iter_: | ||
| res = ray.get(bundle.block_refs) | ||
| # Generate write result report | ||
| write_results.append(_gen_datasink_write_result(res)) | ||
|
|
||
| combined_write_result = WriteResult.combine(*write_results) |
There was a problem hiding this comment.
Great work on switching to an iterator to process write results incrementally! This significantly improves memory efficiency.
To further enhance this and make it even more memory-friendly, you can use a generator expression instead of creating an intermediate write_results list. This avoids materializing all WriteResult objects in memory before combining them.
Also, the stats variable returned by _execute_to_iterator() is not used, so it can be replaced with _ to signal that it's intentionally ignored.
iter_, _ = self._write_ds._execute_to_iterator()
write_results_iter = (
_gen_datasink_write_result(ray.get(bundle.block_refs))
for bundle in iter_
)
combined_write_result = WriteResult.combine(*write_results_iter)…locks in memory instead (ray-project#57108) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Changes Execute as a typical iteration writing and releasing resulting blocks incrementally rather than holding on to them until the whole op completes. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Write ops now consume results via an iterator and aggregate per-bundle WriteResult using a new combine API, avoiding accumulating all blocks in memory. > > - **Write Path (Dataset.write_datasink)**: > - Switch to streaming execution (`_execute_to_iterator`) to process write results incrementally instead of `ray.get` on all blocks. > - Accumulate per-bundle results via `_gen_datasink_write_result` and aggregate with `WriteResult.combine`; log and callback use the combined result. > - **APIs/Utilities**: > - Move write-result aggregation helper to `ray.data.datasource.datasink` as `_gen_datasink_write_result`. > - Add `WriteResult.combine(...)` for aggregating multiple write results. > - **Code Cleanup**: > - Remove `gen_datasink_write_result` from `planner/plan_write_op.py` and related imports; adjust imports in `dataset.py` accordingly. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4c5a373. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Seiji Eicher <seiji@anyscale.com>
…locks in memory instead (ray-project#57108) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Changes Execute as a typical iteration writing and releasing resulting blocks incrementally rather than holding on to them until the whole op completes. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Write ops now consume results via an iterator and aggregate per-bundle WriteResult using a new combine API, avoiding accumulating all blocks in memory. > > - **Write Path (Dataset.write_datasink)**: > - Switch to streaming execution (`_execute_to_iterator`) to process write results incrementally instead of `ray.get` on all blocks. > - Accumulate per-bundle results via `_gen_datasink_write_result` and aggregate with `WriteResult.combine`; log and callback use the combined result. > - **APIs/Utilities**: > - Move write-result aggregation helper to `ray.data.datasource.datasink` as `_gen_datasink_write_result`. > - Add `WriteResult.combine(...)` for aggregating multiple write results. > - **Code Cleanup**: > - Remove `gen_datasink_write_result` from `planner/plan_write_op.py` and related imports; adjust imports in `dataset.py` accordingly. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4c5a373. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
…locks in memory instead (ray-project#57108) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Changes Execute as a typical iteration writing and releasing resulting blocks incrementally rather than holding on to them until the whole op completes. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Write ops now consume results via an iterator and aggregate per-bundle WriteResult using a new combine API, avoiding accumulating all blocks in memory. > > - **Write Path (Dataset.write_datasink)**: > - Switch to streaming execution (`_execute_to_iterator`) to process write results incrementally instead of `ray.get` on all blocks. > - Accumulate per-bundle results via `_gen_datasink_write_result` and aggregate with `WriteResult.combine`; log and callback use the combined result. > - **APIs/Utilities**: > - Move write-result aggregation helper to `ray.data.datasource.datasink` as `_gen_datasink_write_result`. > - Add `WriteResult.combine(...)` for aggregating multiple write results. > - **Code Cleanup**: > - Remove `gen_datasink_write_result` from `planner/plan_write_op.py` and related imports; adjust imports in `dataset.py` accordingly. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4c5a373. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…locks in memory instead (ray-project#57108) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Changes Execute as a typical iteration writing and releasing resulting blocks incrementally rather than holding on to them until the whole op completes. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Write ops now consume results via an iterator and aggregate per-bundle WriteResult using a new combine API, avoiding accumulating all blocks in memory. > > - **Write Path (Dataset.write_datasink)**: > - Switch to streaming execution (`_execute_to_iterator`) to process write results incrementally instead of `ray.get` on all blocks. > - Accumulate per-bundle results via `_gen_datasink_write_result` and aggregate with `WriteResult.combine`; log and callback use the combined result. > - **APIs/Utilities**: > - Move write-result aggregation helper to `ray.data.datasource.datasink` as `_gen_datasink_write_result`. > - Add `WriteResult.combine(...)` for aggregating multiple write results. > - **Code Cleanup**: > - Remove `gen_datasink_write_result` from `planner/plan_write_op.py` and related imports; adjust imports in `dataset.py` accordingly. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4c5a373. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Josh Kodi <joshkodi@gmail.com>
…locks in memory instead (ray-project#57108) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Changes Execute as a typical iteration writing and releasing resulting blocks incrementally rather than holding on to them until the whole op completes. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Write ops now consume results via an iterator and aggregate per-bundle WriteResult using a new combine API, avoiding accumulating all blocks in memory. > > - **Write Path (Dataset.write_datasink)**: > - Switch to streaming execution (`_execute_to_iterator`) to process write results incrementally instead of `ray.get` on all blocks. > - Accumulate per-bundle results via `_gen_datasink_write_result` and aggregate with `WriteResult.combine`; log and callback use the combined result. > - **APIs/Utilities**: > - Move write-result aggregation helper to `ray.data.datasource.datasink` as `_gen_datasink_write_result`. > - Add `WriteResult.combine(...)` for aggregating multiple write results. > - **Code Cleanup**: > - Remove `gen_datasink_write_result` from `planner/plan_write_op.py` and related imports; adjust imports in `dataset.py` accordingly. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4c5a373. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…locks in memory instead (ray-project#57108) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Changes Execute as a typical iteration writing and releasing resulting blocks incrementally rather than holding on to them until the whole op completes. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Write ops now consume results via an iterator and aggregate per-bundle WriteResult using a new combine API, avoiding accumulating all blocks in memory. > > - **Write Path (Dataset.write_datasink)**: > - Switch to streaming execution (`_execute_to_iterator`) to process write results incrementally instead of `ray.get` on all blocks. > - Accumulate per-bundle results via `_gen_datasink_write_result` and aggregate with `WriteResult.combine`; log and callback use the combined result. > - **APIs/Utilities**: > - Move write-result aggregation helper to `ray.data.datasource.datasink` as `_gen_datasink_write_result`. > - Add `WriteResult.combine(...)` for aggregating multiple write results. > - **Code Cleanup**: > - Remove `gen_datasink_write_result` from `planner/plan_write_op.py` and related imports; adjust imports in `dataset.py` accordingly. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4c5a373. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…locks in memory instead (ray-project#57108) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Changes Execute as a typical iteration writing and releasing resulting blocks incrementally rather than holding on to them until the whole op completes. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Write ops now consume results via an iterator and aggregate per-bundle WriteResult using a new combine API, avoiding accumulating all blocks in memory. > > - **Write Path (Dataset.write_datasink)**: > - Switch to streaming execution (`_execute_to_iterator`) to process write results incrementally instead of `ray.get` on all blocks. > - Accumulate per-bundle results via `_gen_datasink_write_result` and aggregate with `WriteResult.combine`; log and callback use the combined result. > - **APIs/Utilities**: > - Move write-result aggregation helper to `ray.data.datasource.datasink` as `_gen_datasink_write_result`. > - Add `WriteResult.combine(...)` for aggregating multiple write results. > - **Code Cleanup**: > - Remove `gen_datasink_write_result` from `planner/plan_write_op.py` and related imports; adjust imports in `dataset.py` accordingly. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4c5a373. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
…locks in memory instead (ray-project#57108) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Changes Execute as a typical iteration writing and releasing resulting blocks incrementally rather than holding on to them until the whole op completes. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Write ops now consume results via an iterator and aggregate per-bundle WriteResult using a new combine API, avoiding accumulating all blocks in memory. > > - **Write Path (Dataset.write_datasink)**: > - Switch to streaming execution (`_execute_to_iterator`) to process write results incrementally instead of `ray.get` on all blocks. > - Accumulate per-bundle results via `_gen_datasink_write_result` and aggregate with `WriteResult.combine`; log and callback use the combined result. > - **APIs/Utilities**: > - Move write-result aggregation helper to `ray.data.datasource.datasink` as `_gen_datasink_write_result`. > - Add `WriteResult.combine(...)` for aggregating multiple write results. > - **Code Cleanup**: > - Remove `gen_datasink_write_result` from `planner/plan_write_op.py` and related imports; adjust imports in `dataset.py` accordingly. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 4c5a373. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
Changes
Execute as a typical iteration writing and releasing resulting blocks incrementally rather than holding on to them until the whole op completes.
Related issue number
Checks
git commit -s) in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.Note
Write ops now consume results via an iterator and aggregate per-bundle WriteResult using a new combine API, avoiding accumulating all blocks in memory.
_execute_to_iterator) to process write results incrementally instead ofray.geton all blocks._gen_datasink_write_resultand aggregate withWriteResult.combine; log and callback use the combined result.ray.data.datasource.datasinkas_gen_datasink_write_result.WriteResult.combine(...)for aggregating multiple write results.gen_datasink_write_resultfromplanner/plan_write_op.pyand related imports; adjust imports indataset.pyaccordingly.Written by Cursor Bugbot for commit 4c5a373. This will update automatically on new commits. Configure here.