Skip to content

[Ray data llm] Allow specifying autoscaling actors for vllm#52258

Merged
richardliaw merged 62 commits intoray-project:masterfrom
han-steve:llm/add-autoscaling-vllm-group
Apr 13, 2025
Merged

[Ray data llm] Allow specifying autoscaling actors for vllm#52258
richardliaw merged 62 commits intoray-project:masterfrom
han-steve:llm/add-autoscaling-vllm-group

Conversation

@han-steve
Copy link
Copy Markdown
Contributor

@han-steve han-steve commented Apr 11, 2025

Why are these changes needed?

In order to start an elastic ray job that will run even when there are less resources than the maximum requested, we need to be able to set the actor concurrency to (m, n). Otherwise, if we set a single number, the job won't start/won't resume until all actors are available, which means the job can't elastically use all the resources that the cluster has available at the time.

Btw I'm not sure why we choose to use (1, concurrency) for the preprocessing actor but concurrency for the vllm actor. The benefit of ray data is being able to scale each stage separately. While we want to make the API simple, should we also let users configure this? Anyways, I kept this logic in this version.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@jcotant1 jcotant1 added the llm label Apr 11, 2025
han-steve and others added 29 commits April 11, 2025 14:48
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…tion` and automatic episode ID generation. (ray-project#52107)

Signed-off-by: Steve Han <stevehan2001@gmail.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
otherwise it is consistently timing out

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
otherwise it is consistently timing out

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
more resilient to internet weather.

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

The test currently semi-consistently timeouts.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
## Why are these changes needed?

Add support for `user_config` in local testing mode.

## Related issue number

Closes ray-project#52051

---------

Signed-off-by: Cindy Zhang <cindyzyx9@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
## Summary

Add back the info message when the worker group has been initialized
successfully. Right now there's only an "Attempting to launch X
workers," but no confirmation once the workers are actually created.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…s as an alternate constructor (ray-project#51265)

This PR is a follow-up of ray-project#50045,
which folds the new LightGBM API into
`ray.train.lightgbm.LightGBMTrainer` so that users don't need to change
their imports to use the new custom train function API.

---------

Signed-off-by: Hongpeng Guo <hpguo@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…t#52185)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

We observed the following error in deploying Ray LLM V0:

```

2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | return run_method(self, method, args, kwargs)
-- | -- | -- | -- | --
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/utils.py", line 2255, in run_method
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | return func(*args, **kwargs)
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | ^^^^^^^^^^^^^^^^^^^^^
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/worker/worker_base.py", line 604, in init_device
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | self.worker.init_device()  # type: ignore
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | ^^^^^^^^^^^^^^^^^^^^^^^^^
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/worker/gpu_worker.py", line 120, in init_device
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | self.model_runner: GPUModelRunner = GPUModelRunner(
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | ^^^^^^^^^^^^^^^
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/worker/gpu_model_runner.py", line 110, in __init__
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | self.attn_backend = get_attn_backend(
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | ^^^^^^^^^^^^^^^^^
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/attention/selector.py", line 95, in get_attn_backend
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | return _cached_get_attn_backend(Show Details
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | ^^^^^^^^^^^^^^^^^^^^^^^^^
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/attention/selector.py", line 148, in _cached_get_attn_backend
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | attention_cls = current_platform.get_attn_backend_cls(
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/platforms/cuda.py", line 270, in get_attn_backend_cls
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | and kv_cache_dtype.startswith("fp8"))
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | ^^^^^^^^^^^^^^^^^^^^^^^^^
U | 2025-04-09, 10:50:12.511 | worker |   | ip-10-0-97-109 | AttributeError: 'torch.dtype' object has no attribute 'startswith'

```

This is because vLLM has inconsistent logic when handling
`kv_cache_dtype`. For V0, it is a string and for V1, it is a
torch.dtype. And when the V0 and V1 configs are messed up, this issue
manifests.

Another recent change in vLLM is that it auto tries V1 and use V1
configs if possible. Therefore, when we are not using an environment
VLLM_USE_V1, Ray LLM defaults to V0 and vLLM may use V1. This PR fixes
the inconsistency by always explicitly passing the environment variable.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
… locality_with_output (ray-project#52005)

## Why are these changes needed?
`OutputSplitter._locality_hints` semantics needs to be kept separate
from `locality_with_output` and `actor_locality_enabled`, so each of
these can be set explicitly. To achieve this,

1. Make the `streaming_split_locality` an argument in
`ray.train.DataConfig`. If it's true, then pass in the Train worker
nodes as locality hints to `streaming_split`.

2. Remove all usage of `locality_with_output` and
`actor_locality_enabled` in the OutputSplitter / Ray Train DataConfig
code.

- `locality_with_output` is now a config that ONLY affects
`TaskPoolMapOperator`.
- `actor_locality_enabled` is now a config that ONLY affects
`ActorPoolMapOperator`.
- Also, maybe in the future, individual map operators can be configured
separately from each other, since right now it affects all operators
globally.

3. Setting `streaming_split_locality` = True, and `locality_with_output`
= True guarantees that ALL map tasks outputs end up on one of the Train
workers, since every map task gets scheduled on the Train worker nodes.
But this also means that all other nodes will be underutilized. It is
probably not great to affect the scheduling of ALL map operators
globally.

4. Setting `streaming_split_locality` = True, and `locality_with_output`
= False will have a lower "hit rate," and a locality hit just depends on
the pipeline's last map task getting randomly scheduled on a Train
worker. (All map tasks before the last one don't matter for streaming
split locality.) However, there will be better cluster utilization due
to spreading out tasks across nodes with no Train worker.

<!-- Please give a short summary of the change and the problem this
solves. -->

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: srinathk10 <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Co-authored-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…n of Ray (ray-project#52176)

Signed-off-by: Sam Hallam <sam.hallam@reverb.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…_column` (ray-project#52112)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

If you're using PyArrow 12 or earlier and pass in a list of scalars to
`BlockAccessor.append_column`, the method raises an error. To address
this issue, this PR re-implements the method.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

To avoid timeouts.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
be consistent with deps resolving

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…in API pages (ray-project#51924)

The Train v1 and v2 API docs have a notice about the pending v2 release.
this PR adds links to the migration guide.

---------

Signed-off-by: Ricardo Decal <rdecal@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
We changed where the visualization info lives but never updated this
link.

Signed-off-by: dayshah <dhyey2019@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
… docs (ray-project#52186)

## Why are these changes needed?

The docstring for serve Deployment has incorrect indentation.

## Related issue number

NA
## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Ricardo Decal <rdecal@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…folder path (ray-project#52173)

Signed-off-by: Chi-Sheng Liu <chishengliu@chishengliu.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…eme cases (ray-project#52121)

Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
with updated URL

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
fix link breakage

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

**Problem**

Current `random_sample()` does not work with fixed seed.
ray-project#40406

Previous attempts (changing the global seed or passing the same
seed/state to workers) also do not work.

**Solution [Updated after PR review]**

In order to use random generators in parallel, we need to be careful
about the seed/state that passes into `map_batches`. `numpy` describes a
few methods and one of them is to use a sequence of seeds
https://numpy.org/doc/2.2/reference/random/parallel.html#sequence-of-integer-seeds.
In Ray Data, we can construct a `random_sample()` UDF that has access to
a "block id" via `TaskContext` (that is thread-local) and use
`[block_id, seed]` to initialize a RNG. As the Ray task may be reused
for different blocks, the RNG is saved into `TaskContext.kwargs`.

**Proposed fix  [Updated after PR review]**

We add `set/get_current()` methods to `TaskContext` which allow the UDF
to get a local copy. It has access to the `task_idx` and previously
initialized RNG. This removes the need of extra arguments in the
original proposal.

**After fix**
```python
In [9]: ds = ray.data.range(1000)

In [10]: ds.random_sample(0.05, seed=1234).take_batch()
Out[10]:
{'id': array([ 27,  54,  72, 111, 136, 144, 147, 168, 200, 224, 225, 245, 247,
        248, 307, 312, 313, 340, 347, 375])}

In [11]: ds.random_sample(0.05, seed=1234).take_batch()
Out[11]:
{'id': array([ 27,  54,  72, 111, 136, 144, 147, 168, 200, 224, 225, 245, 247,
        248, 307, 312, 313, 340, 347, 375])}
```

## Related issue number

This issue has been raised a few times:
Closes ray-project#40406 ray-project#48497

Other implementations did not solve the root cause:
ray-project#46088
ray-project#49443

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

Signed-off-by: Kit Lee <7000003+wingkitlee0@users.noreply.github.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
Ziy1-Tan and others added 10 commits April 11, 2025 14:48
…ect#52016)

Closes ray-project#52010
---------

Signed-off-by: Ziy1-Tan <ajb459684460@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…handle eventual consistency (ray-project#52200)

Signed-off-by: Rueian <rueiancsie@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…ay-project#52208)

Signed-off-by: Chi-Sheng Liu <chishengliu@chishengliu.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…ject#52225)

Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
change the response from `/API/serve/applications` to include target
details. See the sample output for reference

Request: `curl --location '127.0.0.1:8265/api/serve/applications/'`

Response:
https://gist.github.com/abrarsheikh/282078d3fc6c06f9455bc0fcf74dcf4e

Why Include Target Details?
By exposing the IP and ports of all proxies, we can simplify the
configuration of an ALB in GCP/AWS. This update includes both HTTP and
gRPC ports, as these are the two protocols supported by Serve.

Why Is prefix_route Always Set to /?
In the current Ray Serve architecture, all requests first reach the
proxy, which then routes them to the appropriate replica. Since the ALB
only needs to forward requests to any available proxy, it does not
require knowledge of the exact prefix route. Hence, prefix_route is
always set to /.

---------

Signed-off-by: Abrar Sheikh <abrar@anyscale.com>
Signed-off-by: abrar <abrar@anyscale.com>
Co-authored-by: Cindy Zhang <cindyzyx9@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
When an error happens in the user training function, it is raised in a
separate thread and not currently logged.

This propagates the error to be logged when the thread completes with an
error.

---------

Signed-off-by: Matthew Deng <matt@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
…ailure (ray-project#52202)

Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
@han-steve han-steve force-pushed the llm/add-autoscaling-vllm-group branch from 53c6d35 to 69c3030 Compare April 11, 2025 21:49
han-steve and others added 3 commits April 11, 2025 15:01
Signed-off-by: Steve Han <stevehan2001@gmail.com>
Signed-off-by: Steve Han <stevehan2001@gmail.com>
@han-steve han-steve marked this pull request as ready for review April 11, 2025 22:10
@han-steve han-steve requested a review from a team as a code owner April 11, 2025 22:10
@han-steve
Copy link
Copy Markdown
Contributor Author

Sorry for the messy commits... I forgot to sign off

@hainesmichaelc hainesmichaelc added the community-contribution Contributed by the community label Apr 11, 2025
Copy link
Copy Markdown
Contributor

@lk-chen lk-chen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM, thanks!

map_batches_kwargs=dict(
zero_copy_batch=True,
# The number of running replicas. Note that we use a single
# integer to let Ray Data prepare all replicas before kicking
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@comaniac do you have any insights why "prepare all replicas before kicking off processing"?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I forgot the exact reason...you can try to use (1,n) first and roll back if found issues

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah (1,n) worked, so I generalized it to (m,n)


stages = []
if isinstance(config.concurrency, int):
processor_concurrency = (1, config.concurrency) # copied from previous logic
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'll remove it in the next PR!

@richardliaw richardliaw added the go add ONLY when ready to merge, run all tests label Apr 12, 2025
PrepareImageStage(
map_batches_kwargs=dict(
zero_copy_batch=True,
concurrency=(1, config.concurrency),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lk-chen @comaniac why did we do (1, concurrency) here? Should this not simply be a passthrough?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provide a range could let Ray Data start processing as soon as the first actor is available. However in general I feel this is not super critical when processing large dataset using LLM, because the end to end processing time could be long.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean

@richardliaw richardliaw merged commit c8a76fd into ray-project:master Apr 13, 2025
6 checks passed
ray.init(runtime_env=config.runtime_env, ignore_reinit_error=True)

stages = []
if isinstance(config.concurrency, int):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I read this change a bit more carefully now. I think the following adjustment would make sense:

Basically the problem is that the non-LLM stages should not become the buttleneck of starting the pipeline (in case resources are not available). I don't know if this is a real concern since these stages are all cpu=1 and in all instances cpu count >> gpu count. So for most cases this won't cause a deadlock issue. There are some edge cases to this tho since we can have 3 of these stages. That's why @comaniac originally had (1, concurrency) for non-llm stages so that they can start with one actor at least. In case of isinstance(conccurency, int), processor_concurrency = (1, concurrency) makes sense. In case of tuple, I think it should be

# Setup the upper bound of concurrency for non-llm stages to the upper bound of conccurency of the llm stage
_, u_con = concurrency 
processor_concurrency = (1, u_con)

@han-steve Are you working on a follow up PR to address these small details?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lk-chen @comaniac what do you guys think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense. My question is - will there ever be a point where the preprocessing needs more replicas? If so, should we have a separate parameter for preprocessing concurrency that can be scaled independently from the vLLM stage?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to think about it when this is a feature request. For now I'd prioritize simplicity of the input config interface. The thing is the tokenizer and image processor are likely never gonna be bottlenecks of the pipeline if they scale with some ratio compared to the main llm engine stage. We can add a ratio multiplier for each stage later on if there is a need. so I think the right design choice for now is (1, concurrency) or (1, upper_concurrency) and then later (1, n_image_processor * concurrency), etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-backlog community-contribution Contributed by the community go add ONLY when ready to merge, run all tests llm

Projects

None yet

Development

Successfully merging this pull request may close these issues.