[Serve] feat: make ray.serve.batch concurrent#53096
[Serve] feat: make ray.serve.batch concurrent#53096zcin merged 21 commits intoray-project:masterfrom
Conversation
027f8c0 to
17eb4b5
Compare
0a40108 to
37c6467
Compare
|
Tested with modified repro script from #53071 from typing import List
import asyncio
import time
from ray import serve
from ray.serve.handle import DeploymentHandle
@serve.deployment(max_ongoing_requests=(8 * 3))
class Model:
@serve.batch(max_batch_size=8, batch_wait_timeout_s=5, max_concurrent_batches=5)
async def __call__(self, multiple_samples: List[int]) -> List[int]:
print(f"Processing batch of size {len(multiple_samples)} at {time.strftime('%Y-%m-%d %H:%M:%S')}")
await asyncio.sleep(1)
return [i * 2 for i in multiple_samples]
async def main():
handle: DeploymentHandle = serve.run(Model.bind())
responses = await asyncio.gather(*map(handle.remote, range(8 * 3)))
for response in responses:
print(response)
if __name__ == "__main__":
asyncio.run(main()) |
37c6467 to
38a1879
Compare
|
Last open question is what to do with self.curr_iteration_start_time since it no longer has the same interpretation. |
python/ray/serve/batching.py
Outdated
There was a problem hiding this comment.
i am open to considering a change in API.
Is it possible to avoid having this knob? Is it necessary or more of a nice to have?
There was a problem hiding this comment.
I would say it improves user friendliness a lot.
The current default will behave the exactly same way as before (sync), keeping it fully backwards compatible.
Having no concurrency cap would probably cause issues for people less familiar with asynchronous programming, as it would spam their batch methods.
Personally, the jobs I'm running will benefit from having this nob by allowing me to control how many requests I have in the actor at once.
There was a problem hiding this comment.
That makes sense. What I am wondering is that max_concurrent_batches is a function of (max_batch_size and max_ongoing_request). Interplay of those three quantities maybe confusing. Can we derived max_concurrent_batches from the other two?
There was a problem hiding this comment.
Unfortunately, I don't think so, there are many external things that affect the optimal value for max_concurrent_batches such as rate limits on a third party API calls or risk of the event loop clogging within the wrapped method.
|
Signed-off-by: Arthur <atte.book@gmail.com>
… parameter Signed-off-by: Arthur <atte.book@gmail.com>
+ Use a set instead of a list for efficiency. Signed-off-by: Arthur <atte.book@gmail.com> access self.semaphore directly in _process_batch Signed-off-by: Arthur <atte.book@gmail.com> add comment for what the semaphore does is `_process_batch` Signed-off-by: Arthur <atte.book@gmail.com>
…t_curr_iteration_start_times()` Signed-off-by: Arthur <atte.book@gmail.com>
Signed-off-by: Arthur <atte.book@gmail.com>
7d365d8 to
ae4cab3
Compare
…tion_start_times` return type Signed-off-by: Arthur <atte.book@gmail.com>
Signed-off-by: Arthur <atte.book@gmail.com>
Signed-off-by: Arthur <atte.book@gmail.com>
…thurBook/ray into arthurbook/concurrent-ray.serve.batch
12387c7 to
5967b4c
Compare
Signed-off-by: Arthur <atte.book@gmail.com> tests: fix warning log message test Signed-off-by: Arthur <atte.book@gmail.com> tests: fix test race condition when checking ongoing task times Signed-off-by: Arthur <atte.book@gmail.com> tests: try without yield Signed-off-by: Arthur <atte.book@gmail.com> fix: return summary statistics instead of dict[Task,float] in `_get_curr_iteration_start_times` Signed-off-by: Arthur <atte.book@gmail.com>
a5dcbe5 to
774b4d6
Compare
Signed-off-by: Arthur <atte.book@gmail.com>
774b4d6 to
0876c2c
Compare
zcin
left a comment
There was a problem hiding this comment.
Overall LGTM! one comment about handling batch completion
python/ray/serve/batching.py
Outdated
| async def _poll_tasks(self) -> None: | ||
| for task in self.tasks: | ||
| if task.done(): | ||
| try: | ||
| await task | ||
| except Exception: | ||
| logger.exception( | ||
| "_process_batches asyncio task ran into an unexpected exception." | ||
| ) |
There was a problem hiding this comment.
will this code execute first or will the _handle_completed_task callback execute first? can we handle exceptions in the callback instead?
There was a problem hiding this comment.
Hi @zcin! Thanks for taking a look at the PR!
I tried to minimize the diff and kept two try except statements (like before):
- one inside the task execution here in my implementation, from here in master
- one where the tasks are polled, here in my implementation from here in master
asyncio.Future callbacks trigger when the promise is marked as done, i.e. either a result is set, or an exception is raised. However, because there is a try-catch already inside the task execution, I don't think we can ever have an exception raised to the callback... We discussed this with @abrarsheikh here. I think we can safely remove the try-catch where the tasks are polled.
If we want to keep the double try-catch, we could move logger.exception into the callback:
def _handle_completed_task(self, task: asyncio.Task) -> None:
self.tasks.remove(task)
del self.curr_iteration_start_times[task]
exception_maybe = task.exception()
if exception_maybe is not None:
if isinstance(exception_maybe, asyncio.CancelledError):
logger.debug("Task was cancelled")
else:
logger.exception("Task failed unexpectedly")But we still would need to try-catch around this await, because Future exceptions are raised when awaited unless gathered with asyncio.gather(..., return_exceptions=True)
WDYT?
There was a problem hiding this comment.
I believe we didn't need the double try catch from the original implementation. So for this new implementation, I think keeping either the one inside _process_batch, or using a callback to log an uncaught exception in _process_batch, is fine. If we go with the second one, we shouldn't need a _poll_tasks function, we just need to attach a callback that logs an exception if there is one.
There was a problem hiding this comment.
Agree. It is redundant. I removed the _poll_tasks loop and just log exceptions in the callback instead.
Signed-off-by: Arthur <atte.book@gmail.com>
@abrarsheikh ## Why are these changes needed? The [current implementation for ray.serve.batch](https://github.com/ray-project/ray/blob/58c081a72aecc33ae31797320ab3a4e17ef02b7f/python/ray/serve/batching.py#L277) executes the batches synchronously. This throttles throughput for asynchronous methods wrapped in `ray.serve.batch`. This PR introduces a new parameter, `max_concurrent_batches` for `ray.serve.batch`. This enables users to control the concurrency of their `ray.serve.batch` wrapped methods such that `max_concurrent_batches`>=1 concurrent requests are processed by the wrapped method at a time. The parameter defaults to 1, keeping exact backwards compatibility with the previous behavior. ## Related issue number Closes #53071. ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [x] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Arthur <atte.book@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
@abrarsheikh ## Why are these changes needed? The [current implementation for ray.serve.batch](https://github.com/ray-project/ray/blob/58c081a72aecc33ae31797320ab3a4e17ef02b7f/python/ray/serve/batching.py#L277) executes the batches synchronously. This throttles throughput for asynchronous methods wrapped in `ray.serve.batch`. This PR introduces a new parameter, `max_concurrent_batches` for `ray.serve.batch`. This enables users to control the concurrency of their `ray.serve.batch` wrapped methods such that `max_concurrent_batches`>=1 concurrent requests are processed by the wrapped method at a time. The parameter defaults to 1, keeping exact backwards compatibility with the previous behavior. ## Related issue number Closes #53071. ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [x] I've included any doc changes needed for https://docs.ray.io/en/master/. - [x] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Arthur <atte.book@gmail.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
@abrarsheikh
Why are these changes needed?
The current implementation for ray.serve.batch executes the batches synchronously. This throttles throughput for asynchronous methods wrapped in
ray.serve.batch.This PR introduces a new parameter,
max_concurrent_batchesforray.serve.batch. This enables users to control the concurrency of theirray.serve.batchwrapped methods such thatmax_concurrent_batches>=1 concurrent requests are processed by the wrapped method at a time. The parameter defaults to 1, keeping exact backwards compatibility with the previous behavior.Related issue number
Closes #53071.
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.