[1/3] queue-based autoscaling - add queue monitor#59430
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a QueueMonitor actor for monitoring queue lengths in Redis and RabbitMQ, which is a valuable addition for asynchronous task processing. The implementation is generally well-structured and includes unit tests. However, I've identified a significant performance concern with the RabbitMQ connection handling that should be addressed. Additionally, there's a minor inconsistency in broker type detection and an opportunity to improve test coverage for the new actor helper functions. My detailed feedback is in the comments below.
14a17b6 to
a982d4b
Compare
Signed-off-by: harshit <harshit@anyscale.com>
ce6a041 to
ad81408
Compare
aslonnie
left a comment
There was a problem hiding this comment.
hmm.. does not need my review any more?
seems that import pika is still in there though?
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
nope, import pika is now resolved as well. |
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
### Summary
This PR is part 1 of 3 for adding queue-based autoscaling support for
Ray Serve TaskConsumer deployments.
### Background
TaskConsumers are workloads that consume tasks from message queues
(Redis, RabbitMQ), and their scaling needs are fundamentally different
from HTTP-based deployments. Instead of scaling based on HTTP request
load, TaskConsumers should scale based on the number of pending tasks in
the message queue.
### Overall Architecture (Full Feature)
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Message Queue │◄─────│ QueueMonitor │ │ ServeController │
│ (Redis/RMQ) │ │ Actor │◄─────│ Autoscaler │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
│ get_queue_length() │
└─────────────────────────┘
│
▼
┌───────────────────────────┐
│ queue_based_autoscaling │
│ _policy() │
│ desired = ceil(len/target)│
└───────────────────────────┘
```
The full implementation consists of three PRs:
| PR | Description | Status |
|----------------|-----------------------------------------------------|------------|
| PR 1 (This PR) | QueueMonitor actor for querying broker queue length |
🔄 Current |
| PR 2 | Introduce default Queue-based autoscaling policy | Upcoming |
| PR 3 | Integration with TaskConsumer deployments | Upcoming |
### This PR: QueueMonitor Actor
This PR introduces the QueueMonitor Ray actor that queries message
brokers to get queue length for autoscaling decisions.
### Key Features
- Multi-broker support: Redis and RabbitMQ
- Lightweight Ray actor: Runs with num_cpus=0, and pika and redis in
runtime env
- Fault tolerance: Caches last known queue length on query failures
- Named actor pattern: QUEUE_MONITOR::<deployment_name> for easy lookup
### Queue Length Calculation
For accurate autoscaling, QueueMonitor returns total workload (pending
tasks):
| Broker | Pending Tasks |
|----------|----------------|
| Redis | LLEN <queue> |
| RabbitMQ | messages_ready |
### Components
1. QueueMonitorConfig - Configuration dataclass with broker URL and
queue name
2. QueueMonitor - Core class that initializes broker connections and
queries queue length
3. QueueMonitorActor - Ray actor wrapper for remote access
4. Helper functions:
- create_queue_monitor_actor() - Create named actor
- get_queue_monitor_actor() - Lookup existing actor
- delete_queue_monitor_actor() - Cleanup on deployment deletion
### Test Plan
- Unit tests for QueueMonitorConfig (7 tests)
- Broker type detection (Redis, RabbitMQ, SQS, unknown)
- Config value storage
- Unit tests for QueueMonitor (4 tests)
- Redis queue length retrieval (pending)
- RabbitMQ queue length retrieval
- Error handling with cached value fallback
---------
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
### Summary
This PR is part 1 of 3 for adding queue-based autoscaling support for
Ray Serve TaskConsumer deployments.
### Background
TaskConsumers are workloads that consume tasks from message queues
(Redis, RabbitMQ), and their scaling needs are fundamentally different
from HTTP-based deployments. Instead of scaling based on HTTP request
load, TaskConsumers should scale based on the number of pending tasks in
the message queue.
### Overall Architecture (Full Feature)
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Message Queue │◄─────│ QueueMonitor │ │ ServeController │
│ (Redis/RMQ) │ │ Actor │◄─────│ Autoscaler │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
│ get_queue_length() │
└─────────────────────────┘
│
▼
┌───────────────────────────┐
│ queue_based_autoscaling │
│ _policy() │
│ desired = ceil(len/target)│
└───────────────────────────┘
```
The full implementation consists of three PRs:
| PR | Description | Status |
|----------------|-----------------------------------------------------|------------|
| PR 1 (This PR) | QueueMonitor actor for querying broker queue length |
🔄 Current |
| PR 2 | Introduce default Queue-based autoscaling policy | Upcoming |
| PR 3 | Integration with TaskConsumer deployments | Upcoming |
### This PR: QueueMonitor Actor
This PR introduces the QueueMonitor Ray actor that queries message
brokers to get queue length for autoscaling decisions.
### Key Features
- Multi-broker support: Redis and RabbitMQ
- Lightweight Ray actor: Runs with num_cpus=0, and pika and redis in
runtime env
- Fault tolerance: Caches last known queue length on query failures
- Named actor pattern: QUEUE_MONITOR::<deployment_name> for easy lookup
### Queue Length Calculation
For accurate autoscaling, QueueMonitor returns total workload (pending
tasks):
| Broker | Pending Tasks |
|----------|----------------|
| Redis | LLEN <queue> |
| RabbitMQ | messages_ready |
### Components
1. QueueMonitorConfig - Configuration dataclass with broker URL and
queue name
2. QueueMonitor - Core class that initializes broker connections and
queries queue length
3. QueueMonitorActor - Ray actor wrapper for remote access
4. Helper functions:
- create_queue_monitor_actor() - Create named actor
- get_queue_monitor_actor() - Lookup existing actor
- delete_queue_monitor_actor() - Cleanup on deployment deletion
### Test Plan
- Unit tests for QueueMonitorConfig (7 tests)
- Broker type detection (Redis, RabbitMQ, SQS, unknown)
- Config value storage
- Unit tests for QueueMonitor (4 tests)
- Redis queue length retrieval (pending)
- RabbitMQ queue length retrieval
- Error handling with cached value fallback
---------
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
### Summary
This PR is part 1 of 3 for adding queue-based autoscaling support for
Ray Serve TaskConsumer deployments.
### Background
TaskConsumers are workloads that consume tasks from message queues
(Redis, RabbitMQ), and their scaling needs are fundamentally different
from HTTP-based deployments. Instead of scaling based on HTTP request
load, TaskConsumers should scale based on the number of pending tasks in
the message queue.
### Overall Architecture (Full Feature)
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Message Queue │◄─────│ QueueMonitor │ │ ServeController │
│ (Redis/RMQ) │ │ Actor │◄─────│ Autoscaler │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
│ get_queue_length() │
└─────────────────────────┘
│
▼
┌───────────────────────────┐
│ queue_based_autoscaling │
│ _policy() │
│ desired = ceil(len/target)│
└───────────────────────────┘
```
The full implementation consists of three PRs:
| PR | Description | Status |
|----------------|-----------------------------------------------------|------------|
| PR 1 (This PR) | QueueMonitor actor for querying broker queue length |
🔄 Current |
| PR 2 | Introduce default Queue-based autoscaling policy | Upcoming |
| PR 3 | Integration with TaskConsumer deployments | Upcoming |
### This PR: QueueMonitor Actor
This PR introduces the QueueMonitor Ray actor that queries message
brokers to get queue length for autoscaling decisions.
### Key Features
- Multi-broker support: Redis and RabbitMQ
- Lightweight Ray actor: Runs with num_cpus=0, and pika and redis in
runtime env
- Fault tolerance: Caches last known queue length on query failures
- Named actor pattern: QUEUE_MONITOR::<deployment_name> for easy lookup
### Queue Length Calculation
For accurate autoscaling, QueueMonitor returns total workload (pending
tasks):
| Broker | Pending Tasks |
|----------|----------------|
| Redis | LLEN <queue> |
| RabbitMQ | messages_ready |
### Components
1. QueueMonitorConfig - Configuration dataclass with broker URL and
queue name
2. QueueMonitor - Core class that initializes broker connections and
queries queue length
3. QueueMonitorActor - Ray actor wrapper for remote access
4. Helper functions:
- create_queue_monitor_actor() - Create named actor
- get_queue_monitor_actor() - Lookup existing actor
- delete_queue_monitor_actor() - Cleanup on deployment deletion
### Test Plan
- Unit tests for QueueMonitorConfig (7 tests)
- Broker type detection (Redis, RabbitMQ, SQS, unknown)
- Config value storage
- Unit tests for QueueMonitor (4 tests)
- Redis queue length retrieval (pending)
- RabbitMQ queue length retrieval
- Error handling with cached value fallback
---------
Signed-off-by: harshit <harshit@anyscale.com>
### Summary
This PR is part 1 of 3 for adding queue-based autoscaling support for
Ray Serve TaskConsumer deployments.
### Background
TaskConsumers are workloads that consume tasks from message queues
(Redis, RabbitMQ), and their scaling needs are fundamentally different
from HTTP-based deployments. Instead of scaling based on HTTP request
load, TaskConsumers should scale based on the number of pending tasks in
the message queue.
### Overall Architecture (Full Feature)
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Message Queue │◄─────│ QueueMonitor │ │ ServeController │
│ (Redis/RMQ) │ │ Actor │◄─────│ Autoscaler │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
│ get_queue_length() │
└─────────────────────────┘
│
▼
┌───────────────────────────┐
│ queue_based_autoscaling │
│ _policy() │
│ desired = ceil(len/target)│
└───────────────────────────┘
```
The full implementation consists of three PRs:
| PR | Description | Status |
|----------------|-----------------------------------------------------|------------|
| PR 1 (This PR) | QueueMonitor actor for querying broker queue length |
🔄 Current |
| PR 2 | Introduce default Queue-based autoscaling policy | Upcoming |
| PR 3 | Integration with TaskConsumer deployments | Upcoming |
### This PR: QueueMonitor Actor
This PR introduces the QueueMonitor Ray actor that queries message
brokers to get queue length for autoscaling decisions.
### Key Features
- Multi-broker support: Redis and RabbitMQ
- Lightweight Ray actor: Runs with num_cpus=0, and pika and redis in
runtime env
- Fault tolerance: Caches last known queue length on query failures
- Named actor pattern: QUEUE_MONITOR::<deployment_name> for easy lookup
### Queue Length Calculation
For accurate autoscaling, QueueMonitor returns total workload (pending
tasks):
| Broker | Pending Tasks |
|----------|----------------|
| Redis | LLEN <queue> |
| RabbitMQ | messages_ready |
### Components
1. QueueMonitorConfig - Configuration dataclass with broker URL and
queue name
2. QueueMonitor - Core class that initializes broker connections and
queries queue length
3. QueueMonitorActor - Ray actor wrapper for remote access
4. Helper functions:
- create_queue_monitor_actor() - Create named actor
- get_queue_monitor_actor() - Lookup existing actor
- delete_queue_monitor_actor() - Cleanup on deployment deletion
### Test Plan
- Unit tests for QueueMonitorConfig (7 tests)
- Broker type detection (Redis, RabbitMQ, SQS, unknown)
- Config value storage
- Unit tests for QueueMonitor (4 tests)
- Redis queue length retrieval (pending)
- RabbitMQ queue length retrieval
- Error handling with cached value fallback
---------
Signed-off-by: harshit <harshit@anyscale.com>
…olicy (#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [#59430](#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: tiennguyentony <46289799+tiennguyentony@users.noreply.github.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: tiennguyentony <46289799+tiennguyentony@users.noreply.github.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com>
…olicy (#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [#59430](#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…olicy (#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [#59430](#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: Muhammad Saif <2024BBIT200@student.Uet.edu.pk>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: Adel Nour <ans9868@nyu.edu>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com>
### Summary
This PR is part 1 of 3 for adding queue-based autoscaling support for
Ray Serve TaskConsumer deployments.
### Background
TaskConsumers are workloads that consume tasks from message queues
(Redis, RabbitMQ), and their scaling needs are fundamentally different
from HTTP-based deployments. Instead of scaling based on HTTP request
load, TaskConsumers should scale based on the number of pending tasks in
the message queue.
### Overall Architecture (Full Feature)
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Message Queue │◄─────│ QueueMonitor │ │ ServeController │
│ (Redis/RMQ) │ │ Actor │◄─────│ Autoscaler │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
│ get_queue_length() │
└─────────────────────────┘
│
▼
┌───────────────────────────┐
│ queue_based_autoscaling │
│ _policy() │
│ desired = ceil(len/target)│
└───────────────────────────┘
```
The full implementation consists of three PRs:
| PR | Description | Status |
|----------------|-----------------------------------------------------|------------|
| PR 1 (This PR) | QueueMonitor actor for querying broker queue length |
🔄 Current |
| PR 2 | Introduce default Queue-based autoscaling policy | Upcoming |
| PR 3 | Integration with TaskConsumer deployments | Upcoming |
### This PR: QueueMonitor Actor
This PR introduces the QueueMonitor Ray actor that queries message
brokers to get queue length for autoscaling decisions.
### Key Features
- Multi-broker support: Redis and RabbitMQ
- Lightweight Ray actor: Runs with num_cpus=0, and pika and redis in
runtime env
- Fault tolerance: Caches last known queue length on query failures
- Named actor pattern: QUEUE_MONITOR::<deployment_name> for easy lookup
### Queue Length Calculation
For accurate autoscaling, QueueMonitor returns total workload (pending
tasks):
| Broker | Pending Tasks |
|----------|----------------|
| Redis | LLEN <queue> |
| RabbitMQ | messages_ready |
### Components
1. QueueMonitorConfig - Configuration dataclass with broker URL and
queue name
2. QueueMonitor - Core class that initializes broker connections and
queries queue length
3. QueueMonitorActor - Ray actor wrapper for remote access
4. Helper functions:
- create_queue_monitor_actor() - Create named actor
- get_queue_monitor_actor() - Lookup existing actor
- delete_queue_monitor_actor() - Cleanup on deployment deletion
### Test Plan
- Unit tests for QueueMonitorConfig (7 tests)
- Broker type detection (Redis, RabbitMQ, SQS, unknown)
- Config value storage
- Unit tests for QueueMonitor (4 tests)
- Redis queue length retrieval (pending)
- RabbitMQ queue length retrieval
- Error handling with cached value fallback
---------
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
### Summary
This PR is part 1 of 3 for adding queue-based autoscaling support for
Ray Serve TaskConsumer deployments.
### Background
TaskConsumers are workloads that consume tasks from message queues
(Redis, RabbitMQ), and their scaling needs are fundamentally different
from HTTP-based deployments. Instead of scaling based on HTTP request
load, TaskConsumers should scale based on the number of pending tasks in
the message queue.
### Overall Architecture (Full Feature)
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ Message Queue │◄─────│ QueueMonitor │ │ ServeController │
│ (Redis/RMQ) │ │ Actor │◄─────│ Autoscaler │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │
│ get_queue_length() │
└─────────────────────────┘
│
▼
┌───────────────────────────┐
│ queue_based_autoscaling │
│ _policy() │
│ desired = ceil(len/target)│
└───────────────────────────┘
```
The full implementation consists of three PRs:
| PR | Description | Status |
|----------------|-----------------------------------------------------|------------|
| PR 1 (This PR) | QueueMonitor actor for querying broker queue length |
🔄 Current |
| PR 2 | Introduce default Queue-based autoscaling policy | Upcoming |
| PR 3 | Integration with TaskConsumer deployments | Upcoming |
### This PR: QueueMonitor Actor
This PR introduces the QueueMonitor Ray actor that queries message
brokers to get queue length for autoscaling decisions.
### Key Features
- Multi-broker support: Redis and RabbitMQ
- Lightweight Ray actor: Runs with num_cpus=0, and pika and redis in
runtime env
- Fault tolerance: Caches last known queue length on query failures
- Named actor pattern: QUEUE_MONITOR::<deployment_name> for easy lookup
### Queue Length Calculation
For accurate autoscaling, QueueMonitor returns total workload (pending
tasks):
| Broker | Pending Tasks |
|----------|----------------|
| Redis | LLEN <queue> |
| RabbitMQ | messages_ready |
### Components
1. QueueMonitorConfig - Configuration dataclass with broker URL and
queue name
2. QueueMonitor - Core class that initializes broker connections and
queries queue length
3. QueueMonitorActor - Ray actor wrapper for remote access
4. Helper functions:
- create_queue_monitor_actor() - Create named actor
- get_queue_monitor_actor() - Lookup existing actor
- delete_queue_monitor_actor() - Cleanup on deployment deletion
### Test Plan
- Unit tests for QueueMonitorConfig (7 tests)
- Broker type detection (Redis, RabbitMQ, SQS, unknown)
- Config value storage
- Unit tests for QueueMonitor (4 tests)
- Redis queue length retrieval (pending)
- RabbitMQ queue length retrieval
- Error handling with cached value fallback
---------
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
…olicy (ray-project#59548) ## Summary This PR adds queue-based autoscaling support for async inference workloads in Ray Serve. It enables deployments to scale based on combined workload from both the message broker queue and HTTP requests. **Related PRs:** - PR 1 (Prerequisite): [ray-project#59430](ray-project#59430) - Broker and QueueMonitor foundation - PR 3 (Follow-up): Integration with TaskConsumer ## Changes ### New Autoscaling Policy | Component | Description | |-----------|-------------| | `async_inference_autoscaling_policy()` | Scales replicas based on combined workload: `queue_length + total_num_requests` | | `default_async_inference_autoscaling_policy` | Export alias for the new policy | ### QueueMonitor Enhancements The `QueueMonitorActor` now pushes queue metrics to the controller for autoscaling: - Accepts `deployment_id` and `controller_handle` parameters - Uses `MetricsPusher` to periodically push queue length to the controller - `start_metrics_pusher()` - deferred initialization (event loop not available in `__init__`) - Lazy initialization in `get_queue_length()` handles actor restarts - Synchronous `__ray_shutdown__` (Ray calls it without awaiting) ### Controller Integration - New `record_autoscaling_metrics_from_async_inference_task_queue()` method - New gauge: `serve_autoscaling_async_inference_task_queue_metrics_delay_ms` ### New Types - `AsyncInferenceTaskQueueMetricReport` - dataclass for queue metrics from QueueMonitor to controller - `AutoscalingContext.async_inference_task_queue_length` - new property for queue length ## Scaling Formula ```python total_workload = queue_length + total_num_requests desired_replicas = total_workload / target_ongoing_requests ``` Example: - Queue: 100 pending tasks - HTTP: 50 ongoing requests - `target_ongoing_requests`: 10 - Desired replicas = (100 + 50) / 10 = 15 --- 🤖 Generated with [Claude Code](https://claude.ai/code) --------- Signed-off-by: harshit <harshit@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Summary
This PR is part 1 of 3 for adding queue-based autoscaling support for Ray Serve TaskConsumer deployments.
Background
TaskConsumers are workloads that consume tasks from message queues (Redis, RabbitMQ), and their scaling needs are fundamentally different from HTTP-based deployments. Instead of scaling based on HTTP request load, TaskConsumers should scale based on the number of pending tasks in the message queue.
Overall Architecture (Full Feature)
The full implementation consists of three PRs:
This PR: QueueMonitor Actor
This PR introduces the QueueMonitor Ray actor that queries message brokers to get queue length for autoscaling decisions.
Key Features
Queue Length Calculation
For accurate autoscaling, QueueMonitor returns total workload (pending tasks):
Components
- create_queue_monitor_actor() - Create named actor
- get_queue_monitor_actor() - Lookup existing actor
- delete_queue_monitor_actor() - Cleanup on deployment deletion
Test Plan