Conversation
… locality hints Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
|
Shows locality hints is wired through. Test Program Test Output Ray Data Log |
…on (#52000) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> Ray Train Release test: Add locality_with_output, actor_locality_enabled option ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com>
raulchen
left a comment
There was a problem hiding this comment.
Can you also add a test to check locality_hints are properly set for streaming_split when actor_locality is disabled.
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
There was a problem hiding this comment.
I think we should:
- Make the "streaming_split_locality" an argument in
ray.train.DataConfig. If it's true, then pass in the Train worker nodes as locality hints tostreaming_split. - Remove all usage of
locality_with_outputandactor_locality_enabledin theOutputSplitter/ Ray TrainDataConfigcode.locality_with_outputis now a config that ONLY affectsTaskPoolMapOperator.actor_locality_enabledis now a config that ONLY affectsActorPoolMapOperator.- Also, maybe in the future, individual map operators can be configured separately from each other, since right now it affects all operators globally.
Setting streaming_split_locality = True, and locality_with_output = True guarantees that ALL map tasks outputs end up on one of the Train workers, since every map task gets scheduled on the Train worker nodes. But this also means that all other nodes will be underutilized. It is probably not great to affect the scheduling of ALL map operators globally.
Setting streaming_split_locality = True, and locality_with_output = False will have a lower "hit rate," and a locality hit just depends on the pipeline's last map task getting randomly scheduled on a Train worker. (All map tasks before the last one don't matter for streaming split locality.) However, there will be better cluster utilization due to spreading out tasks across nodes with no Train worker.
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
python/ray/data/_internal/execution/operators/output_splitter.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
| streaming_split_locality: If it's true, then pass in the Train worker nodes | ||
| as locality hints to streaming_split operations. On by default. |
There was a problem hiding this comment.
| streaming_split_locality: If it's true, then pass in the Train worker nodes | |
| as locality hints to streaming_split operations. On by default. | |
| streaming_split_locality: If it's true, then pass in the Train worker nodes | |
| as locality hints to streaming_split operations, | |
| which prefers assigning data shards to Train workers | |
| located on the same node as the ready data. | |
| On by default. |
There was a problem hiding this comment.
@matthewdeng What do you think about the name? Users may not know about streaming_split details, but maybe that's ok since this is also a more advanced config.
There was a problem hiding this comment.
How about we just prefix it with an underscore, and add [Advanced] to the start of the docstring?
There was a problem hiding this comment.
good point. The current name and doc probably have too much implementation details. what about this:
enable_split_locality: If true, when splitting the datasets across Train workers, locality will be considered to minimize cross-node data transfer.
There was a problem hiding this comment.
enable_shard_locality is also a good name.
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
… locality_with_output (ray-project#52005) ## Why are these changes needed? `OutputSplitter._locality_hints` semantics needs to be kept separate from `locality_with_output` and `actor_locality_enabled`, so each of these can be set explicitly. To achieve this, 1. Make the `streaming_split_locality` an argument in `ray.train.DataConfig`. If it's true, then pass in the Train worker nodes as locality hints to `streaming_split`. 2. Remove all usage of `locality_with_output` and `actor_locality_enabled` in the OutputSplitter / Ray Train DataConfig code. - `locality_with_output` is now a config that ONLY affects `TaskPoolMapOperator`. - `actor_locality_enabled` is now a config that ONLY affects `ActorPoolMapOperator`. - Also, maybe in the future, individual map operators can be configured separately from each other, since right now it affects all operators globally. 3. Setting `streaming_split_locality` = True, and `locality_with_output` = True guarantees that ALL map tasks outputs end up on one of the Train workers, since every map task gets scheduled on the Train worker nodes. But this also means that all other nodes will be underutilized. It is probably not great to affect the scheduling of ALL map operators globally. 4. Setting `streaming_split_locality` = True, and `locality_with_output` = False will have a lower "hit rate," and a locality hit just depends on the pipeline's last map task getting randomly scheduled on a Train worker. (All map tasks before the last one don't matter for streaming split locality.) However, there will be better cluster utilization due to spreading out tasks across nodes with no Train worker. <!-- Please give a short summary of the change and the problem this solves. --> Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com> Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com> Signed-off-by: Hao Chen <chenh1024@gmail.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Hao Chen <chenh1024@gmail.com> Co-authored-by: Justin Yu <justinvyu@anyscale.com> Signed-off-by: Steve Han <stevehan2001@gmail.com>
Why are these changes needed?
OutputSplitter._locality_hintssemantics needs to be kept separate fromlocality_with_outputandactor_locality_enabled, so each of these can be set explicitly. To achieve this,Make the
streaming_split_localityan argument inray.train.DataConfig. If it's true, then pass in the Train worker nodes as locality hints tostreaming_split.Remove all usage of
locality_with_outputandactor_locality_enabledin the OutputSplitter / Ray Train DataConfig code.locality_with_outputis now a config that ONLY affectsTaskPoolMapOperator.actor_locality_enabledis now a config that ONLY affectsActorPoolMapOperator.Setting
streaming_split_locality= True, andlocality_with_output= True guarantees that ALL map tasks outputs end up on one of the Train workers, since every map task gets scheduled on the Train worker nodes. But this also means that all other nodes will be underutilized. It is probably not great to affect the scheduling of ALL map operators globally.Setting
streaming_split_locality= True, andlocality_with_output= False will have a lower "hit rate," and a locality hit just depends on the pipeline's last map task getting randomly scheduled on a Train worker. (All map tasks before the last one don't matter for streaming split locality.) However, there will be better cluster utilization due to spreading out tasks across nodes with no Train worker.Related issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.