[data] Download op fusion / removal of interleaved partitioners#56462
[data] Download op fusion / removal of interleaved partitioners#56462bveeramani merged 4 commits intoray-project:masterfrom
Conversation
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a valuable optimization for chained download operations by removing intermediate partitioner operators, which should significantly improve performance in those scenarios. The logic to detect chained downloads and defer block splitting to the URIDownloader is well-implemented. I have identified a critical syntax error related to an invalid return statement in a generator function, and a high-severity bug in the block splitting logic that could result in an incorrect number of output blocks. Addressing these issues will ensure the correctness and robustness of this new optimization.
bveeramani
left a comment
There was a problem hiding this comment.
Gemini comments, but otherwise LGTM
|
Please also add a test for multiple chained downloads use case? |
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
…project#56462) ## Why are these changes needed? If we have multiple chained downloads e.g. `ds.with_column("bytes_1", download("uri_1")).with_column("bytes_2", download("uri_2")).with_column("bytes_3", download("uri_3"))`, then we would have an operator structure like `URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader`. Each of the `URIPartitioner` operators will be implemented with an ActorPoolMapOperator with concurrency of 1. In these chained downloads, these become bottlenecks and scaling the concurrency of these up will result in additional resource usage that will take resources away from other operators. This solves the problem by deferring some of the partitioning to the `URIDownloader` so we can remove the interleaved partitioners. The result is an operator structure like `URIPartitioner->URIDownloader->URIDownloader->URIDownloader` which delivers much better performance for these cases. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Matthew Owen <mowen@anyscale.com> Signed-off-by: zac <zac@anyscale.com>
…project#56462) ## Why are these changes needed? If we have multiple chained downloads e.g. `ds.with_column("bytes_1", download("uri_1")).with_column("bytes_2", download("uri_2")).with_column("bytes_3", download("uri_3"))`, then we would have an operator structure like `URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader`. Each of the `URIPartitioner` operators will be implemented with an ActorPoolMapOperator with concurrency of 1. In these chained downloads, these become bottlenecks and scaling the concurrency of these up will result in additional resource usage that will take resources away from other operators. This solves the problem by deferring some of the partitioning to the `URIDownloader` so we can remove the interleaved partitioners. The result is an operator structure like `URIPartitioner->URIDownloader->URIDownloader->URIDownloader` which delivers much better performance for these cases. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Matthew Owen <mowen@anyscale.com> Signed-off-by: Marco Stephan <marco@magic.dev>
…project#56462) ## Why are these changes needed? If we have multiple chained downloads e.g. `ds.with_column("bytes_1", download("uri_1")).with_column("bytes_2", download("uri_2")).with_column("bytes_3", download("uri_3"))`, then we would have an operator structure like `URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader`. Each of the `URIPartitioner` operators will be implemented with an ActorPoolMapOperator with concurrency of 1. In these chained downloads, these become bottlenecks and scaling the concurrency of these up will result in additional resource usage that will take resources away from other operators. This solves the problem by deferring some of the partitioning to the `URIDownloader` so we can remove the interleaved partitioners. The result is an operator structure like `URIPartitioner->URIDownloader->URIDownloader->URIDownloader` which delivers much better performance for these cases. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Matthew Owen <mowen@anyscale.com> Signed-off-by: Douglas Strodtman <douglas@anyscale.com>
…project#56462) ## Why are these changes needed? If we have multiple chained downloads e.g. `ds.with_column("bytes_1", download("uri_1")).with_column("bytes_2", download("uri_2")).with_column("bytes_3", download("uri_3"))`, then we would have an operator structure like `URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader`. Each of the `URIPartitioner` operators will be implemented with an ActorPoolMapOperator with concurrency of 1. In these chained downloads, these become bottlenecks and scaling the concurrency of these up will result in additional resource usage that will take resources away from other operators. This solves the problem by deferring some of the partitioning to the `URIDownloader` so we can remove the interleaved partitioners. The result is an operator structure like `URIPartitioner->URIDownloader->URIDownloader->URIDownloader` which delivers much better performance for these cases. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Matthew Owen <mowen@anyscale.com>
…project#56462) ## Why are these changes needed? If we have multiple chained downloads e.g. `ds.with_column("bytes_1", download("uri_1")).with_column("bytes_2", download("uri_2")).with_column("bytes_3", download("uri_3"))`, then we would have an operator structure like `URIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader`. Each of the `URIPartitioner` operators will be implemented with an ActorPoolMapOperator with concurrency of 1. In these chained downloads, these become bottlenecks and scaling the concurrency of these up will result in additional resource usage that will take resources away from other operators. This solves the problem by deferring some of the partitioning to the `URIDownloader` so we can remove the interleaved partitioners. The result is an operator structure like `URIPartitioner->URIDownloader->URIDownloader->URIDownloader` which delivers much better performance for these cases. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Matthew Owen <mowen@anyscale.com>
Why are these changes needed?
If we have multiple chained downloads e.g.
ds.with_column("bytes_1", download("uri_1")).with_column("bytes_2", download("uri_2")).with_column("bytes_3", download("uri_3")), then we would have an operator structure likeURIPartitioner->URIDownloader->URIPartitioner->URIDownloader->URIPartitioner->URIDownloader. Each of theURIPartitioneroperators will be implemented with an ActorPoolMapOperator with concurrency of 1. In these chained downloads, these become bottlenecks and scaling the concurrency of these up will result in additional resource usage that will take resources away from other operators.This solves the problem by deferring some of the partitioning to the
URIDownloaderso we can remove the interleaved partitioners. The result is an operator structure likeURIPartitioner->URIDownloader->URIDownloader->URIDownloaderwhich delivers much better performance for these cases.Related issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.