Skip to content

[data] Download op fusion / removal of interleaved partitioners#56462

Merged
bveeramani merged 4 commits intoray-project:masterfrom
omatthew98:mowen/download-op-fusion
Sep 11, 2025
Merged

[data] Download op fusion / removal of interleaved partitioners#56462
bveeramani merged 4 commits intoray-project:masterfrom
omatthew98:mowen/download-op-fusion

Conversation

@omatthew98
Copy link
Copy Markdown
Contributor

Why are these changes needed?

If we have multiple chained downloads e.g. ds.with_column("bytes_1", download("uri_1")).with_column("bytes_2", download("uri_2")).with_column("bytes_3", download("uri_3")), then we would have an operator structure like URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader. Each of the URIPartitioner operators will be implemented with an ActorPoolMapOperator with concurrency of 1. In these chained downloads, these become bottlenecks and scaling the concurrency of these up will result in additional resource usage that will take resources away from other operators.

This solves the problem by deferring some of the partitioning to the URIDownloader so we can remove the interleaved partitioners. The result is an operator structure like URIPartitioner->URIDownloader->URIDownloader->URIDownloader which delivers much better performance for these cases.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
@omatthew98 omatthew98 requested a review from a team as a code owner September 11, 2025 18:31
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a valuable optimization for chained download operations by removing intermediate partitioner operators, which should significantly improve performance in those scenarios. The logic to detect chained downloads and defer block splitting to the URIDownloader is well-implemented. I have identified a critical syntax error related to an invalid return statement in a generator function, and a high-severity bug in the block splitting logic that could result in an incorrect number of output blocks. Addressing these issues will ensure the correctness and robustness of this new optimization.

Copy link
Copy Markdown
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gemini comments, but otherwise LGTM

@gvspraveen
Copy link
Copy Markdown
Contributor

Please also add a test for multiple chained downloads use case?

@ray-gardener ray-gardener bot added the data Ray Data-related issues label Sep 11, 2025
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
@omatthew98 omatthew98 added the go add ONLY when ready to merge, run all tests label Sep 11, 2025
@bveeramani bveeramani merged commit a9a57a6 into ray-project:master Sep 11, 2025
6 checks passed
ZacAttack pushed a commit to ZacAttack/ray that referenced this pull request Sep 24, 2025
…project#56462)

## Why are these changes needed?
If we have multiple chained downloads e.g. `ds.with_column("bytes_1",
download("uri_1")).with_column("bytes_2",
download("uri_2")).with_column("bytes_3", download("uri_3"))`, then we
would have an operator structure like
`URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader`.
Each of the `URIPartitioner` operators will be implemented with an
ActorPoolMapOperator with concurrency of 1. In these chained downloads,
these become bottlenecks and scaling the concurrency of these up will
result in additional resource usage that will take resources away from
other operators.

This solves the problem by deferring some of the partitioning to the
`URIDownloader` so we can remove the interleaved partitioners. The
result is an operator structure like
`URIPartitioner->URIDownloader->URIDownloader->URIDownloader` which
delivers much better performance for these cases.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: zac <zac@anyscale.com>
marcostephan pushed a commit to marcostephan/ray that referenced this pull request Sep 24, 2025
…project#56462)

## Why are these changes needed?
If we have multiple chained downloads e.g. `ds.with_column("bytes_1",
download("uri_1")).with_column("bytes_2",
download("uri_2")).with_column("bytes_3", download("uri_3"))`, then we
would have an operator structure like
`URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader`.
Each of the `URIPartitioner` operators will be implemented with an
ActorPoolMapOperator with concurrency of 1. In these chained downloads,
these become bottlenecks and scaling the concurrency of these up will
result in additional resource usage that will take resources away from
other operators.

This solves the problem by deferring some of the partitioning to the
`URIDownloader` so we can remove the interleaved partitioners. The
result is an operator structure like
`URIPartitioner->URIDownloader->URIDownloader->URIDownloader` which
delivers much better performance for these cases.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Marco Stephan <marco@magic.dev>
dstrodtman pushed a commit to dstrodtman/ray that referenced this pull request Oct 6, 2025
…project#56462)

## Why are these changes needed?
If we have multiple chained downloads e.g. `ds.with_column("bytes_1",
download("uri_1")).with_column("bytes_2",
download("uri_2")).with_column("bytes_3", download("uri_3"))`, then we
would have an operator structure like
`URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader`.
Each of the `URIPartitioner` operators will be implemented with an
ActorPoolMapOperator with concurrency of 1. In these chained downloads,
these become bottlenecks and scaling the concurrency of these up will
result in additional resource usage that will take resources away from
other operators.

This solves the problem by deferring some of the partitioning to the
`URIDownloader` so we can remove the interleaved partitioners. The
result is an operator structure like
`URIPartitioner->URIDownloader->URIDownloader->URIDownloader` which
delivers much better performance for these cases.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
justinyeh1995 pushed a commit to justinyeh1995/ray that referenced this pull request Oct 20, 2025
…project#56462)

## Why are these changes needed?
If we have multiple chained downloads e.g. `ds.with_column("bytes_1",
download("uri_1")).with_column("bytes_2",
download("uri_2")).with_column("bytes_3", download("uri_3"))`, then we
would have an operator structure like
`URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader`.
Each of the `URIPartitioner` operators will be implemented with an
ActorPoolMapOperator with concurrency of 1. In these chained downloads,
these become bottlenecks and scaling the concurrency of these up will
result in additional resource usage that will take resources away from
other operators.

This solves the problem by deferring some of the partitioning to the
`URIDownloader` so we can remove the interleaved partitioners. The
result is an operator structure like
`URIPartitioner->URIDownloader->URIDownloader->URIDownloader` which
delivers much better performance for these cases.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Matthew Owen <mowen@anyscale.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…project#56462)

## Why are these changes needed?
If we have multiple chained downloads e.g. `ds.with_column("bytes_1",
download("uri_1")).with_column("bytes_2",
download("uri_2")).with_column("bytes_3", download("uri_3"))`, then we
would have an operator structure like
`URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader`.
Each of the `URIPartitioner` operators will be implemented with an
ActorPoolMapOperator with concurrency of 1. In these chained downloads,
these become bottlenecks and scaling the concurrency of these up will
result in additional resource usage that will take resources away from
other operators.

This solves the problem by deferring some of the partitioning to the
`URIDownloader` so we can remove the interleaved partitioners. The
result is an operator structure like
`URIPartitioner->URIDownloader->URIDownloader->URIDownloader` which
delivers much better performance for these cases.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants