Skip to content

[Data] Support subprogress bars on AllToAllOperators with optimizer enabled#34997

Merged
raulchen merged 14 commits intoray-project:masterfrom
scottjlee:a2a-subprog
Jun 2, 2023
Merged

[Data] Support subprogress bars on AllToAllOperators with optimizer enabled#34997
raulchen merged 14 commits intoray-project:masterfrom
scottjlee:a2a-subprog

Conversation

@scottjlee
Copy link
Copy Markdown
Contributor

@scottjlee scottjlee commented May 3, 2023

Why are these changes needed?

Currently, subprogress bars are not correctly rendered and updated with AllToAllOperators when the optimizer is enabled. This PR adds the subprogress bars for the different AllToAll LogicalOperators, such as RandomShuffle, Sort, and Repartition.

The original intent of this PR was to separate out the sub_progress_bars_dict from TaskContext, but we found that this was difficult and will require significant reworking to support it because the sub-progress bars need to be initialized and prior to being passed to the scheduler for execution.

Tested with the following code to observe the output subprogress bars (with all combinations of push/pull based shuffle, and no-shuffle for sort):

import ray 
import time
def sleep(x):
    time.sleep(0.1)
    return x

ctx = ray.data.DataContext.get_current()
ctx.optimizer_enabled = False
ctx.use_push_based_shuffle = True
for _ in (
    ray.data.range(1000 * 1000, parallelism=200)
    .map_batches(sleep, num_cpus=2)
    #.map_batches(sleep, compute=ray.data.ActorPoolStrategy(min_size=2, max_size=4))
    .random_shuffle() # -> tested pushbased=False; pushbased=True has duplicated issue
    #.sort("id") # -> tested pushbased=True+False
    #.repartition(400, shuffle=True) # -> tested shuffle=False, pushbased=True+False
    #.map_batches(sleep, num_cpus=2)
    .iter_batches()
):
    pass
  • random_shuffle():
Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 7.75 MiB/512.0 MiB object_store_memory:  32%|████████████████████▍                                            | 63/200 [00:24<00:37,  3.65it/s]
- RandomShuffle: 0 active, 0 queued, 0.0 MiB objects, 0 output: 100%|████████████████████████████████████████████████████████████████████████████| 200/200 [00:24<00:00, 24.36s/it]
  *- Shuffle Map:  54%|█████████████████████████████████████████████████████████████████▉                                                        | 108/200 [00:23<00:14,  6.55it/s]
  *- Shuffle Reduce:  98%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████   | 195/200 [00:24<00:00, 11.48it/s]
  • sort():
Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 7.75 MiB/512.0 MiB object_store_memory:   0%|▎                                                               | 1/200 [00:24<1:20:59, 24.42s/it]
- Sort: 0 active, 0 queued, 0.0 MiB objects, 0 output: 100%|█████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:24<00:00, 24.41s/it]
  *- Sort Sample: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:23<00:00,  8.61it/s]
  *- Shuffle Map:  98%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏ | 197/200 [00:23<00:00, 11.76it/s]
  *- Shuffle Reduce: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:24<00:00, 11.67it/s]
  • repartition():
Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.04 MiB/512.0 MiB object_store_memory:   0%|                                                                          | 0/400 [00:23<?, ?it/s]
- Repartition: 0 active, 0 queued, 0.0 MiB objects, 0 output:   0%|                                                                                        | 0/400 [00:00<?, ?it/s]
  *- Shuffle Map:  100%|████████████████████████████████████████████████████████████████████████████                                                                                      | 400/400 [00:23<00:40,  7.04it/s]
  *- 98%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████                                                                                                                      | 392/400 [00:24<2:45:10, 24.84s/it]

Related issue number

Closes #33374

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Scott Lee added 7 commits May 3, 2023 00:05
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee scottjlee marked this pull request as ready for review May 13, 2023 01:03
Scott Lee added 2 commits May 12, 2023 18:38
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
map_bar = sub_progress_bar_dict[bar_name]
should_close_bar = False
else:
map_bar = ProgressBar(bar_name, position=0, total=input_num_blocks)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to get rid of these if-else branches? they are a little bit ugly. In which cases, the sub_progress_bar_dict is None? Would be great to unify the code logic.

Scott Lee added 5 commits May 31, 2023 18:56
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
Signed-off-by: Scott Lee <sjl@anyscale.com>
@scottjlee
Copy link
Copy Markdown
Contributor Author

Other failing tests are unrelated to this PR, and present for previous commits into master according to flakey-tests.

@scottjlee scottjlee added the tests-ok The tagger certifies test failures are unrelated and assumes personal liability. label Jun 2, 2023
@scottjlee scottjlee assigned amogkam and unassigned amogkam Jun 2, 2023
@raulchen raulchen merged commit 3d1f6a9 into ray-project:master Jun 2, 2023
arvind-chandra pushed a commit to lmco/ray that referenced this pull request Aug 31, 2023
… enabled (ray-project#34997)

## Why are these changes needed?

Currently, subprogress bars are not correctly rendered and updated with AllToAllOperators when the optimizer is enabled. This PR adds the subprogress bars for the different AllToAll LogicalOperators, such as `RandomShuffle`, `Sort`, and `Repartition`.

The original intent of this PR was to separate out the `sub_progress_bars_dict` from `TaskContext`, but we found that this was difficult and will require significant reworking to support it because the sub-progress bars need to be initialized and prior to being passed to the scheduler for execution.

Tested with the following code to observe the output subprogress bars (with all combinations of push/pull based shuffle, and no-shuffle for sort):
```
import ray
import time
def sleep(x):
    time.sleep(0.1)
    return x

ctx = ray.data.DataContext.get_current()
ctx.optimizer_enabled = False
ctx.use_push_based_shuffle = True
for _ in (
    ray.data.range(1000 * 1000, parallelism=200)
    .map_batches(sleep, num_cpus=2)
    #.map_batches(sleep, compute=ray.data.ActorPoolStrategy(min_size=2, max_size=4))
    .random_shuffle() # -> tested pushbased=False; pushbased=True has duplicated issue
    #.sort("id") # -> tested pushbased=True+False
    #.repartition(400, shuffle=True) # -> tested shuffle=False, pushbased=True+False
    #.map_batches(sleep, num_cpus=2)
    .iter_batches()
):
    pass
```

- `random_shuffle()`:
```
Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 7.75 MiB/512.0 MiB object_store_memory:  32%|████████████████████▍                                            | 63/200 [00:24<00:37,  3.65it/s]
- RandomShuffle: 0 active, 0 queued, 0.0 MiB objects, 0 output: 100%|████████████████████████████████████████████████████████████████████████████| 200/200 [00:24<00:00, 24.36s/it]
  *- Shuffle Map:  54%|█████████████████████████████████████████████████████████████████▉                                                        | 108/200 [00:23<00:14,  6.55it/s]
  *- Shuffle Reduce:  98%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████   | 195/200 [00:24<00:00, 11.48it/s]
```

- `sort()`:
```
Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 7.75 MiB/512.0 MiB object_store_memory:   0%|▎                                                               | 1/200 [00:24<1:20:59, 24.42s/it]
- Sort: 0 active, 0 queued, 0.0 MiB objects, 0 output: 100%|█████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:24<00:00, 24.41s/it]
  *- Sort Sample: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:23<00:00,  8.61it/s]
  *- Shuffle Map:  98%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏ | 197/200 [00:23<00:00, 11.76it/s]
  *- Shuffle Reduce: 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:24<00:00, 11.67it/s]
```

- `repartition()`:
```
Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.04 MiB/512.0 MiB object_store_memory:   0%|                                                                          | 0/400 [00:23<?, ?it/s]
- Repartition: 0 active, 0 queued, 0.0 MiB objects, 0 output:   0%|                                                                                        | 0/400 [00:00<?, ?it/s]
  *- Shuffle Map:  100%|████████████████████████████████████████████████████████████████████████████                                                                                      | 400/400 [00:23<00:40,  7.04it/s]
  *- 98%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████                                                                                                                      | 392/400 [00:24<2:45:10, 24.84s/it]
```

Signed-off-by: e428265 <arvind.chandramouli@lmco.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

tests-ok The tagger certifies test failures are unrelated and assumes personal liability.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Datasets] Support AllToAllOperator.sub_progress_bars with optimizer

4 participants