[2/3] queue-based autoscaling - add default queue-based autoscaling policy#59548
[2/3] queue-based autoscaling - add default queue-based autoscaling policy#59548abrarsheikh merged 15 commits intomasterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a new queue-based autoscaling policy, which is a great addition for TaskConsumer deployments. The implementation is well-structured, with a dedicated QueueMonitor actor and comprehensive unit tests. I've identified a critical bug in the Redis connection handling and a high-severity logic issue in the scaling-to-zero implementation. Addressing these will ensure the new feature is robust and behaves as expected.
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
Signed-off-by: harshit <harshit@anyscale.com>
86223e0 to
26d4bd7
Compare
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
29f8b0b to
0c7cf30
Compare
abrarsheikh
left a comment
There was a problem hiding this comment.
i would base this PR on top of #58857
i am not sure of the timeline we are targeting for #58857 PR, but since we want to get queue-based autoscaling feature out asap, hence, thought of merging this PR as it is. and then once the #58857 PR is merged, i will create a new one, using the changes of #58857 to refactor the queue-aware autoscaling policy. @abrarsheikh lmk your thoughts on it. |
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
abrarsheikh
left a comment
There was a problem hiding this comment.
can you add end to end tests for autoscaling, or is that not possible in this PR?
that won't be possible in this PR as the integration of this queue based autoscaling policy with serve deployments is still pending, will add that in the follow-up PR. |
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: tiennguyentony <46289799+tiennguyentony@users.noreply.github.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: tiennguyentony <46289799+tiennguyentony@users.noreply.github.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com>
…olicy (#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [#59430](#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…olicy (#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [#59430](#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: Muhammad Saif <2024BBIT200@student.Uet.edu.pk>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: Adel Nour <ans9868@nyu.edu>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Summary
This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests.
Related PRs:
Changes
New Autoscaling Policy
async_inference_autoscaling_policy()queue_length + total_num_requestsdefault_async_inference_autoscaling_policyQueueMonitor Enhancements
The
QueueMonitorActornow pushes queue metrics to the controller for autoscaling:deployment_idandcontroller_handleparametersMetricsPusherto periodically push queue length to the controllerstart_metrics_pusher()- deferred initialization (event loop not available in__init__)get_queue_length()handles actor restarts__ray_shutdown__(Ray calls it without awaiting)Controller Integration
record_autoscaling_metrics_from_async_inference_task_queue()methodserve_autoscaling_async_inference_task_queue_metrics_delay_msNew Types
AsyncInferenceTaskQueueMetricReport- dataclass for queue metrics from QueueMonitor to controllerAutoscalingContext.async_inference_task_queue_length- new property for queue lengthScaling Formula
Example:
target_ongoing_requests: 10🤖 Generated with Claude Code