[Serve] Shared LongPollClient for Routers#48807
Conversation
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
|
@JoshKarpel I can't reproduce the test failure locally, could you try merging master and seeing if the failure is still there? |
# Conflicts: # python/ray/serve/_private/router.py
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
| key_listeners: Dict[KeyType, UpdateStateCallable], | ||
| call_in_event_loop: AbstractEventLoop, | ||
| ) -> None: | ||
| assert len(key_listeners) > 0 |
There was a problem hiding this comment.
This case is now handled by https://github.com/ray-project/ray/pull/48807/files#diff-f138b21f7ddcd7d61c0b2704c8b828b9bbe7eb5021531e2c7fabeb20ec322e1aR280-R288 (and is necessary - when the shared client boots up for the first time it will send an RPC with no keys in it)
|
|
||
|
|
||
| def test_reconfigure_with_queries(serve_instance): | ||
| def test_reconfigure_does_not_run_while_there_are_active_queries(serve_instance): |
There was a problem hiding this comment.
Tried to de-flake this test 🤞🏻
|
@JoshKarpel with new routers waiting for its dedicated long poll client to make the first RPC call, do we still ever need to rely on the long poll timeout for anything? |
Oo, interesting question... If the shared long poll never times out, it won't pick up new key listeners until some key it's already listening to has changed. So we'd converge to the same final state over the long term, but only if those existing keys get an update at some point, which isn't guaranteed (for example, in our setup, some Serve apps receive very little traffic and never need to autoscale - it could be hours or days between replica updates for them). I'd prefer to keep the timeout to ensure that there's a time-bound on how long it takes to get to the desired state. |
Ah, but I forgot that when the shared client sends its first request that includes the new key listeners, the snapshot id will be So my concern above is not real - the shared client will always take over on the next timeout cycle when it adds the new listeners. |
Hmm, I see. I think this will can cause a lot of overhead for the controller when apps are first deployed, but should still improve the performance once that first update is received, so that in steady state there aren't tons of separate long poll clients calling into the controller and repeatedly disconnecting/reconnecting. However if that overhead is still a concern, have you tried implementing the cancel that we've discussed before? I am not 100% sure since I haven't implemented it myself, but I think using cancel is safe and will give us what we want. The long poll client uses |
|
Ah, seems like we had a race condition of replying to each other 😅. Yes after the first update the shared client should take over, so if that removes concerns then I think the current implementation is fine. |
Hah, yep! I am not concerned with the performance of the implementation as-is. |
Small isolated PR to (hopefully) fix flakiness issues with `python/ray/serve/tests/test_deploy.py::test_reconfigure_with_queries`, noticed while working on #48807 and other PRs. --------- Signed-off-by: Josh Karpel <josh.karpel@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
# Conflicts: # python/ray/serve/tests/test_deploy.py
Small isolated PR to (hopefully) fix flakiness issues with `python/ray/serve/tests/test_deploy.py::test_reconfigure_with_queries`, noticed while working on ray-project#48807 and other PRs. --------- Signed-off-by: Josh Karpel <josh.karpel@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: Roshan Kathawate <roshankathawate@gmail.com>
Small isolated PR to (hopefully) fix flakiness issues with `python/ray/serve/tests/test_deploy.py::test_reconfigure_with_queries`, noticed while working on ray-project#48807 and other PRs. --------- Signed-off-by: Josh Karpel <josh.karpel@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com> Signed-off-by: lielin.hyl <lielin.hyl@alibaba-inc.com>
Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
|
@zcin looks like I had a failing test from a merge conflict but that's resolved now - ready for another round of review |
| for router in self.routers[deployment_id]: | ||
| router.update_deployment_targets(deployment_target_info) | ||
| router.long_poll_client.stop() |
There was a problem hiding this comment.
Calling stop here means that the router's own long poll client won't "stop" until after the next poll right? Since the change in the long poll host (that triggered this update) also triggered the router's long poll client.
When there is a lot of applications/deployments and controller is slowed down, will there be race conditions with multiple long poll clients updating the same state?
There was a problem hiding this comment.
Correct, the router will get both updates - I guess I was assuming that those updates are and will continue to be idempotent. Is that not the case?
There was a problem hiding this comment.
Hmm yes I believe that's true. Just want to make sure it's thought through carefully, since I haven't touched the long poll code before.
So the router's own long poll client will likely make 2 listen_for_change calls to the controller, but that is fine because the updates are idempotent (and they will time out after at most 30 seconds).
There was a problem hiding this comment.
Sounds right to me!
## Why are these changes needed? In our use case we use Ray Serve with many hundreds/thousands of apps, plus a "router" app that routes traffic to those apps using `DeploymentHandle`s. Right now, that means we have a `LongPollClient` for each `DeploymentHandle` in each router app replica, which could be tens or hundreds of thousands of `LongPollClient`s. This is expensive on both the Serve Controller and on the router app replicas. It can be particularly problematic in resource usage on the Serve Controller - the main thing blocking us from having as many router replicas as we'd like is the stability of the controller. This PR aims to amortize this cost of having so many `LongPollClient`s by going from one-long-poll-client-per-handle to one-long-poll-client-per-process. Each `DeploymentHandle`'s `Router` now registers itself with a shared `LongPollClient` held by a singleton. The actual implementation that I've gone with is a bit clunky because I'm trying to bridge the gap between the current solution and a design that *only* has shared `LongPollClient`s. This could potentially be cleaned up in the future. Right now, each `Router` still gets a dedicated `LongPollClient` that only runs temporarily, until the shared client tells it to stop. Related: #45957 is the same idea but for handle autoscaling metrics pushing. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
Small isolated PR to (hopefully) fix flakiness issues with `python/ray/serve/tests/test_deploy.py::test_reconfigure_with_queries`, noticed while working on ray-project#48807 and other PRs. --------- Signed-off-by: Josh Karpel <josh.karpel@gmail.com> Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
## Why are these changes needed? In our use case we use Ray Serve with many hundreds/thousands of apps, plus a "router" app that routes traffic to those apps using `DeploymentHandle`s. Right now, that means we have a `LongPollClient` for each `DeploymentHandle` in each router app replica, which could be tens or hundreds of thousands of `LongPollClient`s. This is expensive on both the Serve Controller and on the router app replicas. It can be particularly problematic in resource usage on the Serve Controller - the main thing blocking us from having as many router replicas as we'd like is the stability of the controller. This PR aims to amortize this cost of having so many `LongPollClient`s by going from one-long-poll-client-per-handle to one-long-poll-client-per-process. Each `DeploymentHandle`'s `Router` now registers itself with a shared `LongPollClient` held by a singleton. The actual implementation that I've gone with is a bit clunky because I'm trying to bridge the gap between the current solution and a design that *only* has shared `LongPollClient`s. This could potentially be cleaned up in the future. Right now, each `Router` still gets a dedicated `LongPollClient` that only runs temporarily, until the shared client tells it to stop. Related: ray-project#45957 is the same idea but for handle autoscaling metrics pushing. <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Josh Karpel <josh.karpel@gmail.com>
#52793) ## Why are these changes needed? This is a followup to our previous PR #48807 , which changed the way that the `LongPollClient` is managed by `Router`s such that there is, in the long-term, only a single active `LongPollClient` per client process. To make that happen, that PR added a way to change the set of `snapshot_ids` that the `LongPollClient` was listening for. Unfortunately, I didn't realize at the time that the registration code that changes the `snapshot_ids` would be running in a different thread from the `LongPollClient`'s polling loop, which means that the snapshot ID dictionary could be concurrently mutated from one thread while the other thread is trying to serialize it, leading to a stack trace like: ``` Exception in callback <function LongPollClient._process_update.<locals>.chained at 0x7f1d4afbc540> handle: <Handle LongPollClient._process_update.<locals>.chained> Traceback (most recent call last): File "uvloop/cbhandles.pyx", line 61, in uvloop.loop.Handle._run File ".../site-packages/ray/serve/_private/long_poll.py", line 199, in chained self._on_callback_completed(trigger_at=len(updates)) File ".../site-packages/ray/serve/_private/long_poll.py", line 133, in _on_callback_completed self._poll_next() File ".../site-packages/ray/serve/_private/long_poll.py", line 143, in _poll_next self._current_ref = self.host_actor.listen_for_change.remote(self.snapshot_ids) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File ".../site-packages/ray/actor.py", line 206, in remote return self._remote(args, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File ".../site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper return fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^ File ".../site-packages/ray/util/tracing/tracing_helper.py", line 422, in _start_span return method(self, args, kwargs, *_args, **_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File ".../site-packages/ray/actor.py", line 366, in _remote return invocation(args, kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^ File ".../site-packages/ray/actor.py", line 347, in invocation return actor._actor_method_call( ^^^^^^^^^^^^^^^^^^^^^^^^^ File ".../site-packages/ray/actor.py", line 1503, in _actor_method_call object_refs = worker.core_worker.submit_actor_task( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "python/ray/_raylet.pyx", line 3972, in ray._raylet.CoreWorker.submit_actor_task File "python/ray/_raylet.pyx", line 3977, in ray._raylet.CoreWorker.submit_actor_task File "python/ray/_raylet.pyx", line 859, in ray._raylet.prepare_args_and_increment_put_refs File "python/ray/_raylet.pyx", line 850, in ray._raylet.prepare_args_and_increment_put_refs File "python/ray/_raylet.pyx", line 900, in ray._raylet.prepare_args_internal File "/app/virtualenv/lib/python3.12/site-packages/ray/_private/serialization.py", line 556, in serialize return self._serialize_to_msgpack(value) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File ".../site-packages/ray/_private/serialization.py", line 530, in _serialize_to_msgpack msgpack_data = MessagePackSerializer.dumps(value, _python_serializer) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "python/ray/includes/serialization.pxi", line 175, in ray._raylet.MessagePackSerializer.dumps File ".../site-packages/msgpack/__init__.py", line 36, in packb return Packer(**kwargs).pack(o) ^^^^^^^^^^^^^^^^^^^^^^^^ File "msgpack/_packer.pyx", line 279, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 276, in msgpack._cmsgpack.Packer.pack File "msgpack/_packer.pyx", line 265, in msgpack._cmsgpack.Packer._pack File "msgpack/_packer.pyx", line 211, in msgpack._cmsgpack.Packer._pack_inner RuntimeError: dictionary changed size during iteration ``` If this happens (it's rare; we've been running the shared long poll code for months but only noticed this when we started making a lot more Serve deployments per unit time recently), the `LongPollClient` never calls `poll_next()` again, and thus no longer receives updates from the Serve Controller. Depending on how fast your actor replicas come and go, this eventually leads to poor scheduling, and then no scheduling if all the replicas you had at the time of the error go away. A simple fix for this is to shallow copy the snapshot IDs just before passing them to the `.remote()` call. The shallow copy is then not modified if the original snapshot IDs held by the `LongPollClient` are modified. We've tested this fix in our system and it did prevent the problem from occurring. Since it would be difficult to arrange this to happen in a deterministic test, I've added a comment describing the problem case that the copy solves for. ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [x] This PR is not tested :( --------- Signed-off-by: Josh Karpel <josh.karpel@gmail.com> Co-authored-by: Cindy Zhang <cindyzyx9@gmail.com>
Why are these changes needed?
In our use case we use Ray Serve with many hundreds/thousands of apps, plus a "router" app that routes traffic to those apps using
DeploymentHandles. Right now, that means we have aLongPollClientfor eachDeploymentHandlein each router app replica, which could be tens or hundreds of thousands ofLongPollClients. This is expensive on both the Serve Controller and on the router app replicas. It can be particularly problematic in resource usage on the Serve Controller - the main thing blocking us from having as many router replicas as we'd like is the stability of the controller.This PR aims to amortize this cost of having so many
LongPollClients by going from one-long-poll-client-per-handle to one-long-poll-client-per-process. EachDeploymentHandle'sRouternow registers itself with a sharedLongPollClientheld by a singleton.The actual implementation that I've gone with is a bit clunky because I'm trying to bridge the gap between the current solution and a design that only has shared
LongPollClients. This could potentially be cleaned up in the future. Right now, eachRouterstill gets a dedicatedLongPollClientthat only runs temporarily, until the shared client tells it to stop.Related: #45957 is the same idea but for handle autoscaling metrics pushing.
Related issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.