Skip to content

[Serve] feat: make ray.serve.batch concurrent#53096

Merged
zcin merged 21 commits intoray-project:masterfrom
ArthurBook:arthurbook/concurrent-ray.serve.batch
Jun 12, 2025
Merged

[Serve] feat: make ray.serve.batch concurrent#53096
zcin merged 21 commits intoray-project:masterfrom
ArthurBook:arthurbook/concurrent-ray.serve.batch

Conversation

@ArthurBook
Copy link
Copy Markdown
Contributor

@ArthurBook ArthurBook commented May 16, 2025

@abrarsheikh

Why are these changes needed?

The current implementation for ray.serve.batch executes the batches synchronously. This throttles throughput for asynchronous methods wrapped in ray.serve.batch.

This PR introduces a new parameter, max_concurrent_batches for ray.serve.batch. This enables users to control the concurrency of their ray.serve.batch wrapped methods such that max_concurrent_batches>=1 concurrent requests are processed by the wrapped method at a time. The parameter defaults to 1, keeping exact backwards compatibility with the previous behavior.

Related issue number

Closes #53071.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@ArthurBook ArthurBook force-pushed the arthurbook/concurrent-ray.serve.batch branch 4 times, most recently from 027f8c0 to 17eb4b5 Compare May 16, 2025 19:12
@hainesmichaelc hainesmichaelc added the community-contribution Contributed by the community label May 16, 2025
@ArthurBook ArthurBook force-pushed the arthurbook/concurrent-ray.serve.batch branch 2 times, most recently from 0a40108 to 37c6467 Compare May 16, 2025 22:17
@ArthurBook
Copy link
Copy Markdown
Contributor Author

Tested with modified repro script from #53071

from typing import List

import asyncio
import time
from ray import serve
from ray.serve.handle import DeploymentHandle


@serve.deployment(max_ongoing_requests=(8 * 3))
class Model:
    @serve.batch(max_batch_size=8, batch_wait_timeout_s=5, max_concurrent_batches=5)
    async def __call__(self, multiple_samples: List[int]) -> List[int]:
        print(f"Processing batch of size {len(multiple_samples)} at {time.strftime('%Y-%m-%d %H:%M:%S')}")
        await asyncio.sleep(1)
        return [i * 2 for i in multiple_samples]


async def main():
    handle: DeploymentHandle = serve.run(Model.bind())
    responses = await asyncio.gather(*map(handle.remote, range(8 * 3)))
    for response in responses:
        print(response)


if __name__ == "__main__":
    asyncio.run(main())
underwent 6s ❯ python test.py
/Users/arthurbook/projects/ray/.venv/lib/python3.9/site-packages/urllib3/__init__.py:35: NotOpenSSLWarning: urllib3 v2 only supports OpenSSL 1.1.1+, currently the 'ssl' module is compiled with 'LibreSSL 2.8.3'. See: https://github.com/urllib3/urllib3/issues/3020
  warnings.warn(
2025-05-16 15:16:30,355 INFO worker.py:1879 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
INFO 2025-05-16 15:16:31,549 serve 57512 -- Started Serve in namespace "serve".
(ProxyActor pid=57841) INFO 2025-05-16 15:16:31,529 proxy 127.0.0.1 -- Proxy starting on node 1cd98eb69193e21bfa1f97f9ebaaa80ab7cccb214c8cdaae6dfdf091 (HTTP port: 8000).
(ProxyActor pid=57841) INFO 2025-05-16 15:16:31,543 proxy 127.0.0.1 -- Got updated endpoints: {}.
(ServeController pid=57837) INFO 2025-05-16 15:16:31,576 controller 57837 -- Deploying new version of Deployment(name='Model', app='default') (initial target replicas: 1).
(ProxyActor pid=57841) INFO 2025-05-16 15:16:31,578 proxy 127.0.0.1 -- Got updated endpoints: {Deployment(name='Model', app='default'): EndpointInfo(route='/', app_is_cross_language=False)}.
(ProxyActor pid=57841) INFO 2025-05-16 15:16:31,582 proxy 127.0.0.1 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x12facf610>.
(ServeController pid=57837) INFO 2025-05-16 15:16:31,678 controller 57837 -- Adding 1 replica to Deployment(name='Model', app='default').
INFO 2025-05-16 15:16:32,663 serve 57512 -- Application 'default' is ready at http://127.0.0.1:8000/.
INFO 2025-05-16 15:16:32,667 serve 57512 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x115f1b4f0>.
(ServeReplica:default:Model pid=57848) Processing batch of size 8 at 2025-05-16 15:16:32
(ServeReplica:default:Model pid=57848) Processing batch of size 8 at 2025-05-16 15:16:32
(ServeReplica:default:Model pid=57848) Processing batch of size 8 at 2025-05-16 15:16:32
0
2
4
6
8
10
12
14
16
18
20
22
24
26
28
30
32
34
36
38
40
42
44
46
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,682 default_Model evz7qqbi 0ea256a4-c5fc-4793-a4ba-c37f4a11fcac -- CALL __call__ OK 1008.2ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,683 default_Model evz7qqbi 25ade65d-5a44-4c97-9b90-4a05d76eca92 -- CALL __call__ OK 1008.0ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,683 default_Model evz7qqbi cc78b021-42ca-4ade-afad-f3e22a90e51e -- CALL __call__ OK 1008.3ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,684 default_Model evz7qqbi d25508a7-0aa5-4d42-8525-73a256a4d07e -- CALL __call__ OK 1009.0ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,684 default_Model evz7qqbi 77915b00-6d39-4dbf-9d11-7ec267f3eb9e -- CALL __call__ OK 1008.4ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,685 default_Model evz7qqbi d6d0483f-adfa-4789-a6fc-5a330334f120 -- CALL __call__ OK 1008.6ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,685 default_Model evz7qqbi 9f1b87f3-5ade-4fd1-9242-45e9f789d5b6 -- CALL __call__ OK 1008.4ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,685 default_Model evz7qqbi 9ac3d690-beec-470d-bd2c-888c7cff2e3a -- CALL __call__ OK 1008.2ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,686 default_Model evz7qqbi 0f1faf1e-1f52-47a3-9df9-f6a4f6f4adb3 -- CALL __call__ OK 1009.0ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,687 default_Model evz7qqbi d96d441a-874d-4cee-b04e-c092db931b00 -- CALL __call__ OK 1008.8ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,687 default_Model evz7qqbi b475538f-91bc-45c0-a050-6e00a15d9c4d -- CALL __call__ OK 1009.0ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,687 default_Model evz7qqbi 4a7ca7d3-cf2b-42c6-809c-d2c46a557ec2 -- CALL __call__ OK 1008.0ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,687 default_Model evz7qqbi bf5cb7b0-eed5-463f-8f5a-eba02dc7c2ba -- CALL __call__ OK 1008.1ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,688 default_Model evz7qqbi 1dc29d40-bbc4-419c-89fb-b386b93540c8 -- CALL __call__ OK 1008.5ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,688 default_Model evz7qqbi 773dad01-bb84-477b-83cc-fe212a5cc857 -- CALL __call__ OK 1008.9ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,688 default_Model evz7qqbi 25ca43d4-56f5-41cc-a01b-8adc5b0cf6da -- CALL __call__ OK 1009.1ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,689 default_Model evz7qqbi 739c3daa-512c-4160-8f28-db93a703814d -- CALL __call__ OK 1009.1ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,689 default_Model evz7qqbi 650eab61-86e5-4b68-b26d-32ab40c993f1 -- CALL __call__ OK 1008.8ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,689 default_Model evz7qqbi 8aaa5e94-7b10-413d-a350-7eac01879cbd -- CALL __call__ OK 1009.0ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,689 default_Model evz7qqbi 199418ac-8df7-4d2f-9a64-90bf6a369a26 -- CALL __call__ OK 1009.1ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,689 default_Model evz7qqbi fe557276-b7b4-464d-9953-3c754b4db236 -- CALL __call__ OK 1008.7ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,689 default_Model evz7qqbi 0c83e1f1-3a6f-412c-a614-5e7143b17f67 -- CALL __call__ OK 1008.7ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,689 default_Model evz7qqbi d0d6fc6f-7305-4fdf-a289-3fa8572dbeb1 -- CALL __call__ OK 1007.2ms
(ServeReplica:default:Model pid=57848) INFO 2025-05-16 15:16:33,690 default_Model evz7qqbi 00d9150b-cbc7-4228-9b0a-3df2afff8594 -- CALL __call__ OK 1007.3ms

@ArthurBook ArthurBook force-pushed the arthurbook/concurrent-ray.serve.batch branch from 37c6467 to 38a1879 Compare May 16, 2025 22:25
@ArthurBook ArthurBook marked this pull request as ready for review May 16, 2025 22:29
@ArthurBook
Copy link
Copy Markdown
Contributor Author

Last open question is what to do with self.curr_iteration_start_time since it no longer has the same interpretation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am open to considering a change in API.

Is it possible to avoid having this knob? Is it necessary or more of a nice to have?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say it improves user friendliness a lot.

The current default will behave the exactly same way as before (sync), keeping it fully backwards compatible.

Having no concurrency cap would probably cause issues for people less familiar with asynchronous programming, as it would spam their batch methods.

Personally, the jobs I'm running will benefit from having this nob by allowing me to control how many requests I have in the actor at once.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. What I am wondering is that max_concurrent_batches is a function of (max_batch_size and max_ongoing_request). Interplay of those three quantities maybe confusing. Can we derived max_concurrent_batches from the other two?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, I don't think so, there are many external things that affect the optimal value for max_concurrent_batches such as rate limits on a third party API calls or risk of the event loop clogging within the wrapped method.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see your point

@mascharkh mascharkh added the serve Ray Serve Related Issue label May 19, 2025
@abrarsheikh
Copy link
Copy Markdown
Contributor

  1. pre-commit checks on failing on your diff can you fix those and run the linters that ship with ray, see https://docs.ray.io/en/latest/ray-contribute/development.html#pre-commit-hooks. This is probably why I see many changes in your diff around max line length.
  2. I am overall in agreement with the design and approach, let's go ahead and add tests to your PR
  3. there are some test failing on this PR that seem unrelated to your change, please try pulling master before your next rivision.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see your point

Signed-off-by: Arthur <atte.book@gmail.com>
… parameter

Signed-off-by: Arthur <atte.book@gmail.com>
+ Use a set instead of a list for efficiency.

Signed-off-by: Arthur <atte.book@gmail.com>

access self.semaphore directly in _process_batch

Signed-off-by: Arthur <atte.book@gmail.com>

add comment for what the semaphore does is `_process_batch`

Signed-off-by: Arthur <atte.book@gmail.com>
…t_curr_iteration_start_times()`

Signed-off-by: Arthur <atte.book@gmail.com>
Signed-off-by: Arthur <atte.book@gmail.com>
@ArthurBook ArthurBook force-pushed the arthurbook/concurrent-ray.serve.batch branch from 7d365d8 to ae4cab3 Compare May 22, 2025 04:26
Copy link
Copy Markdown
Contributor

@abrarsheikh abrarsheikh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, lgtm

Signed-off-by: Arthur <atte.book@gmail.com>
…thurBook/ray into arthurbook/concurrent-ray.serve.batch
@abrarsheikh abrarsheikh added the go add ONLY when ready to merge, run all tests label May 23, 2025
@ArthurBook ArthurBook force-pushed the arthurbook/concurrent-ray.serve.batch branch 3 times, most recently from 12387c7 to 5967b4c Compare May 24, 2025 06:04
Signed-off-by: Arthur <atte.book@gmail.com>

tests: fix warning log message test

Signed-off-by: Arthur <atte.book@gmail.com>

tests: fix test race condition when checking ongoing task times

Signed-off-by: Arthur <atte.book@gmail.com>

tests: try without yield

Signed-off-by: Arthur <atte.book@gmail.com>

fix: return summary statistics instead of dict[Task,float] in
`_get_curr_iteration_start_times`

Signed-off-by: Arthur <atte.book@gmail.com>
@ArthurBook ArthurBook force-pushed the arthurbook/concurrent-ray.serve.batch branch 6 times, most recently from a5dcbe5 to 774b4d6 Compare May 24, 2025 21:45
Signed-off-by: Arthur <atte.book@gmail.com>
@ArthurBook ArthurBook force-pushed the arthurbook/concurrent-ray.serve.batch branch from 774b4d6 to 0876c2c Compare May 24, 2025 21:47
Copy link
Copy Markdown
Contributor

@zcin zcin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM! one comment about handling batch completion

Comment on lines +362 to +370
async def _poll_tasks(self) -> None:
for task in self.tasks:
if task.done():
try:
await task
except Exception:
logger.exception(
"_process_batches asyncio task ran into an unexpected exception."
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this code execute first or will the _handle_completed_task callback execute first? can we handle exceptions in the callback instead?

Copy link
Copy Markdown
Contributor Author

@ArthurBook ArthurBook Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zcin! Thanks for taking a look at the PR!

I tried to minimize the diff and kept two try except statements (like before):

asyncio.Future callbacks trigger when the promise is marked as done, i.e. either a result is set, or an exception is raised. However, because there is a try-catch already inside the task execution, I don't think we can ever have an exception raised to the callback... We discussed this with @abrarsheikh here. I think we can safely remove the try-catch where the tasks are polled.

If we want to keep the double try-catch, we could move logger.exception into the callback:

def _handle_completed_task(self, task: asyncio.Task) -> None:
    self.tasks.remove(task)
    del self.curr_iteration_start_times[task]
    exception_maybe = task.exception()
    if exception_maybe is not None:
        if isinstance(exception_maybe, asyncio.CancelledError):
            logger.debug("Task was cancelled")
        else:
            logger.exception("Task failed unexpectedly")

But we still would need to try-catch around this await, because Future exceptions are raised when awaited unless gathered with asyncio.gather(..., return_exceptions=True)

WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we didn't need the double try catch from the original implementation. So for this new implementation, I think keeping either the one inside _process_batch, or using a callback to log an uncaught exception in _process_batch, is fine. If we go with the second one, we shouldn't need a _poll_tasks function, we just need to attach a callback that logs an exception if there is one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. It is redundant. I removed the _poll_tasks loop and just log exceptions in the callback instead.

@ArthurBook ArthurBook requested a review from a team as a code owner June 12, 2025 03:05
Signed-off-by: Arthur <atte.book@gmail.com>
@zcin zcin merged commit 7d9c30d into ray-project:master Jun 12, 2025
5 checks passed
@ArthurBook ArthurBook deleted the arthurbook/concurrent-ray.serve.batch branch June 12, 2025 17:38
elliot-barn pushed a commit that referenced this pull request Jun 18, 2025
@abrarsheikh

## Why are these changes needed?
The [current implementation for
ray.serve.batch](https://github.com/ray-project/ray/blob/58c081a72aecc33ae31797320ab3a4e17ef02b7f/python/ray/serve/batching.py#L277)
executes the batches synchronously. This throttles throughput for
asynchronous methods wrapped in `ray.serve.batch`.

This PR introduces a new parameter, `max_concurrent_batches` for
`ray.serve.batch`. This enables users to control the concurrency of
their `ray.serve.batch` wrapped methods such that
`max_concurrent_batches`>=1 concurrent requests are processed by the
wrapped method at a time. The parameter defaults to 1, keeping exact
backwards compatibility with the previous behavior.

## Related issue number
Closes #53071.

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [x] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Arthur <atte.book@gmail.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Jul 2, 2025
@abrarsheikh

## Why are these changes needed?
The [current implementation for
ray.serve.batch](https://github.com/ray-project/ray/blob/58c081a72aecc33ae31797320ab3a4e17ef02b7f/python/ray/serve/batching.py#L277)
executes the batches synchronously. This throttles throughput for
asynchronous methods wrapped in `ray.serve.batch`.

This PR introduces a new parameter, `max_concurrent_batches` for
`ray.serve.batch`. This enables users to control the concurrency of
their `ray.serve.batch` wrapped methods such that
`max_concurrent_batches`>=1 concurrent requests are processed by the
wrapped method at a time. The parameter defaults to 1, keeping exact
backwards compatibility with the previous behavior.

## Related issue number
Closes #53071.

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [x] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Arthur <atte.book@gmail.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community go add ONLY when ready to merge, run all tests serve Ray Serve Related Issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Serve] concurrency in ray.serve.batch

5 participants