Skip to content

[Data] Use iterator in write ops instead of accumulating all of the blocks in memory instead#57108

Merged
alexeykudinkin merged 5 commits intoray-project:masterfrom
alexeykudinkin:ak/wrt-iter-clup
Oct 2, 2025
Merged

[Data] Use iterator in write ops instead of accumulating all of the blocks in memory instead#57108
alexeykudinkin merged 5 commits intoray-project:masterfrom
alexeykudinkin:ak/wrt-iter-clup

Conversation

@alexeykudinkin
Copy link
Copy Markdown
Contributor

@alexeykudinkin alexeykudinkin commented Oct 1, 2025

Changes

Execute as a typical iteration writing and releasing resulting blocks incrementally rather than holding on to them until the whole op completes.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run pre-commit jobs to lint the changes in this PR. (pre-commit setup)
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Note

Write ops now consume results via an iterator and aggregate per-bundle WriteResult using a new combine API, avoiding accumulating all blocks in memory.

  • Write Path (Dataset.write_datasink):
    • Switch to streaming execution (_execute_to_iterator) to process write results incrementally instead of ray.get on all blocks.
    • Accumulate per-bundle results via _gen_datasink_write_result and aggregate with WriteResult.combine; log and callback use the combined result.
  • APIs/Utilities:
    • Move write-result aggregation helper to ray.data.datasource.datasink as _gen_datasink_write_result.
    • Add WriteResult.combine(...) for aggregating multiple write results.
  • Code Cleanup:
    • Remove gen_datasink_write_result from planner/plan_write_op.py and related imports; adjust imports in dataset.py accordingly.

Written by Cursor Bugbot for commit 4c5a373. This will update automatically on new commits. Configure here.

…n mem instead

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin requested a review from a team as a code owner October 1, 2025 21:22
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Oct 1, 2025
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request aims to optimize memory usage in write_datasink by using an iterator. While the change correctly introduces an iterator, it doesn't fully achieve the memory-saving goal because it still accumulates all write results in memory before processing. I've provided a review comment with a suggested code change to process the results incrementally, which should properly address the underlying memory issue.

cursor[bot]

This comment was marked as outdated.

@alexeykudinkin alexeykudinkin enabled auto-merge (squash) October 1, 2025 21:38
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@github-actions github-actions bot disabled auto-merge October 1, 2025 22:15
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@goutamvenkat-anyscale
Copy link
Copy Markdown
Contributor

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the write operations to use an iterator-based approach for handling write results, which is a great improvement for memory efficiency. Instead of accumulating all block references and getting their results at once with ray.get(), the new implementation processes them incrementally. This avoids holding all results in memory, which is particularly beneficial for large datasets.

The changes include:

  • Modifying Dataset.write_datasink to use _execute_to_iterator and process write results in a streaming fashion.
  • Introducing a WriteResult.combine class method to aggregate per-bundle write results.
  • Relocating the gen_datasink_write_result helper function to a more appropriate module.

The implementation is solid. I've suggested a minor enhancement to use a generator expression, which further improves memory usage by avoiding an intermediate list of results. Overall, this is a valuable optimization.

Comment on lines +4928 to +4936
iter_, stats = self._write_ds._execute_to_iterator()
write_results = []

for bundle in iter_:
res = ray.get(bundle.block_refs)
# Generate write result report
write_results.append(_gen_datasink_write_result(res))

combined_write_result = WriteResult.combine(*write_results)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Great work on switching to an iterator to process write results incrementally! This significantly improves memory efficiency.

To further enhance this and make it even more memory-friendly, you can use a generator expression instead of creating an intermediate write_results list. This avoids materializing all WriteResult objects in memory before combining them.

Also, the stats variable returned by _execute_to_iterator() is not used, so it can be replaced with _ to signal that it's intentionally ignored.

            iter_, _ = self._write_ds._execute_to_iterator()
            write_results_iter = (
                _gen_datasink_write_result(ray.get(bundle.block_refs))
                for bundle in iter_
            )
            combined_write_result = WriteResult.combine(*write_results_iter)

@ray-gardener ray-gardener bot added the data Ray Data-related issues label Oct 2, 2025
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) October 2, 2025 01:55
@alexeykudinkin alexeykudinkin merged commit 540fe55 into ray-project:master Oct 2, 2025
7 checks passed
eicherseiji pushed a commit to eicherseiji/ray that referenced this pull request Oct 6, 2025
…locks in memory instead (ray-project#57108)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Changes

Execute as a typical iteration writing and releasing resulting blocks
incrementally rather than holding on to them until the whole op
completes.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Write ops now consume results via an iterator and aggregate per-bundle
WriteResult using a new combine API, avoiding accumulating all blocks in
memory.
>
> - **Write Path (Dataset.write_datasink)**:
> - Switch to streaming execution (`_execute_to_iterator`) to process
write results incrementally instead of `ray.get` on all blocks.
> - Accumulate per-bundle results via `_gen_datasink_write_result` and
aggregate with `WriteResult.combine`; log and callback use the combined
result.
> - **APIs/Utilities**:
> - Move write-result aggregation helper to
`ray.data.datasource.datasink` as `_gen_datasink_write_result`.
> - Add `WriteResult.combine(...)` for aggregating multiple write
results.
> - **Code Cleanup**:
> - Remove `gen_datasink_write_result` from `planner/plan_write_op.py`
and related imports; adjust imports in `dataset.py` accordingly.
>
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
4c5a373. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Seiji Eicher <seiji@anyscale.com>
dstrodtman pushed a commit to dstrodtman/ray that referenced this pull request Oct 6, 2025
…locks in memory instead (ray-project#57108)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Changes

Execute as a typical iteration writing and releasing resulting blocks
incrementally rather than holding on to them until the whole op
completes.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Write ops now consume results via an iterator and aggregate per-bundle
WriteResult using a new combine API, avoiding accumulating all blocks in
memory.
>
> - **Write Path (Dataset.write_datasink)**:
> - Switch to streaming execution (`_execute_to_iterator`) to process
write results incrementally instead of `ray.get` on all blocks.
> - Accumulate per-bundle results via `_gen_datasink_write_result` and
aggregate with `WriteResult.combine`; log and callback use the combined
result.
> - **APIs/Utilities**:
> - Move write-result aggregation helper to
`ray.data.datasource.datasink` as `_gen_datasink_write_result`.
> - Add `WriteResult.combine(...)` for aggregating multiple write
results.
> - **Code Cleanup**:
> - Remove `gen_datasink_write_result` from `planner/plan_write_op.py`
and related imports; adjust imports in `dataset.py` accordingly.
>
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
4c5a373. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
liulehui pushed a commit to liulehui/ray that referenced this pull request Oct 9, 2025
…locks in memory instead (ray-project#57108)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Changes

Execute as a typical iteration writing and releasing resulting blocks
incrementally rather than holding on to them until the whole op
completes.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Write ops now consume results via an iterator and aggregate per-bundle
WriteResult using a new combine API, avoiding accumulating all blocks in
memory.
> 
> - **Write Path (Dataset.write_datasink)**:
> - Switch to streaming execution (`_execute_to_iterator`) to process
write results incrementally instead of `ray.get` on all blocks.
> - Accumulate per-bundle results via `_gen_datasink_write_result` and
aggregate with `WriteResult.combine`; log and callback use the combined
result.
> - **APIs/Utilities**:
> - Move write-result aggregation helper to
`ray.data.datasource.datasink` as `_gen_datasink_write_result`.
> - Add `WriteResult.combine(...)` for aggregating multiple write
results.
> - **Code Cleanup**:
> - Remove `gen_datasink_write_result` from `planner/plan_write_op.py`
and related imports; adjust imports in `dataset.py` accordingly.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
4c5a373. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
joshkodi pushed a commit to joshkodi/ray that referenced this pull request Oct 13, 2025
…locks in memory instead (ray-project#57108)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Changes

Execute as a typical iteration writing and releasing resulting blocks
incrementally rather than holding on to them until the whole op
completes.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Write ops now consume results via an iterator and aggregate per-bundle
WriteResult using a new combine API, avoiding accumulating all blocks in
memory.
>
> - **Write Path (Dataset.write_datasink)**:
> - Switch to streaming execution (`_execute_to_iterator`) to process
write results incrementally instead of `ray.get` on all blocks.
> - Accumulate per-bundle results via `_gen_datasink_write_result` and
aggregate with `WriteResult.combine`; log and callback use the combined
result.
> - **APIs/Utilities**:
> - Move write-result aggregation helper to
`ray.data.datasource.datasink` as `_gen_datasink_write_result`.
> - Add `WriteResult.combine(...)` for aggregating multiple write
results.
> - **Code Cleanup**:
> - Remove `gen_datasink_write_result` from `planner/plan_write_op.py`
and related imports; adjust imports in `dataset.py` accordingly.
>
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
4c5a373. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Josh Kodi <joshkodi@gmail.com>
justinyeh1995 pushed a commit to justinyeh1995/ray that referenced this pull request Oct 20, 2025
…locks in memory instead (ray-project#57108)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Changes

Execute as a typical iteration writing and releasing resulting blocks
incrementally rather than holding on to them until the whole op
completes.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Write ops now consume results via an iterator and aggregate per-bundle
WriteResult using a new combine API, avoiding accumulating all blocks in
memory.
> 
> - **Write Path (Dataset.write_datasink)**:
> - Switch to streaming execution (`_execute_to_iterator`) to process
write results incrementally instead of `ray.get` on all blocks.
> - Accumulate per-bundle results via `_gen_datasink_write_result` and
aggregate with `WriteResult.combine`; log and callback use the combined
result.
> - **APIs/Utilities**:
> - Move write-result aggregation helper to
`ray.data.datasource.datasink` as `_gen_datasink_write_result`.
> - Add `WriteResult.combine(...)` for aggregating multiple write
results.
> - **Code Cleanup**:
> - Remove `gen_datasink_write_result` from `planner/plan_write_op.py`
and related imports; adjust imports in `dataset.py` accordingly.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
4c5a373. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…locks in memory instead (ray-project#57108)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Changes

Execute as a typical iteration writing and releasing resulting blocks
incrementally rather than holding on to them until the whole op
completes.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Write ops now consume results via an iterator and aggregate per-bundle
WriteResult using a new combine API, avoiding accumulating all blocks in
memory.
> 
> - **Write Path (Dataset.write_datasink)**:
> - Switch to streaming execution (`_execute_to_iterator`) to process
write results incrementally instead of `ray.get` on all blocks.
> - Accumulate per-bundle results via `_gen_datasink_write_result` and
aggregate with `WriteResult.combine`; log and callback use the combined
result.
> - **APIs/Utilities**:
> - Move write-result aggregation helper to
`ray.data.datasource.datasink` as `_gen_datasink_write_result`.
> - Add `WriteResult.combine(...)` for aggregating multiple write
results.
> - **Code Cleanup**:
> - Remove `gen_datasink_write_result` from `planner/plan_write_op.py`
and related imports; adjust imports in `dataset.py` accordingly.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
4c5a373. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
…locks in memory instead (ray-project#57108)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Changes

Execute as a typical iteration writing and releasing resulting blocks
incrementally rather than holding on to them until the whole op
completes.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Write ops now consume results via an iterator and aggregate per-bundle
WriteResult using a new combine API, avoiding accumulating all blocks in
memory.
>
> - **Write Path (Dataset.write_datasink)**:
> - Switch to streaming execution (`_execute_to_iterator`) to process
write results incrementally instead of `ray.get` on all blocks.
> - Accumulate per-bundle results via `_gen_datasink_write_result` and
aggregate with `WriteResult.combine`; log and callback use the combined
result.
> - **APIs/Utilities**:
> - Move write-result aggregation helper to
`ray.data.datasource.datasink` as `_gen_datasink_write_result`.
> - Add `WriteResult.combine(...)` for aggregating multiple write
results.
> - **Code Cleanup**:
> - Remove `gen_datasink_write_result` from `planner/plan_write_op.py`
and related imports; adjust imports in `dataset.py` accordingly.
>
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
4c5a373. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
…locks in memory instead (ray-project#57108)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Changes

Execute as a typical iteration writing and releasing resulting blocks
incrementally rather than holding on to them until the whole op
completes.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Write ops now consume results via an iterator and aggregate per-bundle
WriteResult using a new combine API, avoiding accumulating all blocks in
memory.
>
> - **Write Path (Dataset.write_datasink)**:
> - Switch to streaming execution (`_execute_to_iterator`) to process
write results incrementally instead of `ray.get` on all blocks.
> - Accumulate per-bundle results via `_gen_datasink_write_result` and
aggregate with `WriteResult.combine`; log and callback use the combined
result.
> - **APIs/Utilities**:
> - Move write-result aggregation helper to
`ray.data.datasource.datasink` as `_gen_datasink_write_result`.
> - Add `WriteResult.combine(...)` for aggregating multiple write
results.
> - **Code Cleanup**:
> - Remove `gen_datasink_write_result` from `planner/plan_write_op.py`
and related imports; adjust imports in `dataset.py` accordingly.
>
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
4c5a373. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants