-
Notifications
You must be signed in to change notification settings - Fork 7.4k
[Serve][Autoscaling] Last scale-up and scale-down time not given for application level autoscaling policy #59001
Description
What happened + What you expected to happen
What happened
When implementing a custom application level autoscaling policy, a dict of contexts is provided, one for each deployment. Two of the fields of AutoscalingContext are last_scale_up_time and last_scale_down_time. However these are always None, even when scaling does happen.
What I expect to happen
last_scale_down_time is not None after a scale-down event and last_scale_up_time is not None after a scale-up event
Versions / Dependencies
Ray: 2.51.0
Python: 3.12
Reproduction script
serve.yaml:
# This file was generated using the `serve build` command on Ray v2.51.0.
proxy_location: EveryNode
http_options:
host: 0.0.0.0
port: 8000
grpc_options:
port: 9000
grpc_servicer_functions: []
logging_config:
encoding: TEXT
log_level: INFO
logs_dir: null
enable_access_log: true
additional_log_standard_attrs: []
applications:
- name: app1
route_prefix: /
import_path: main:app
autoscaling_policy:
policy_function: main:coordinated_scaling_policy
runtime_env: {}
deployments:
- name: Worker
max_ongoing_requests: 4
ray_actor_options:
num_cpus: 0.2
autoscaling_config:
target_ongoing_requests: 2
min_replicas: 1
max_replicas: 5
- name: Callermain.py
import asyncio
from ray import serve
from ray.serve._private.common import DeploymentID
from ray.serve.config import AutoscalingContext
from ray.serve.handle import DeploymentHandle
def coordinated_scaling_policy(
contexts: dict[DeploymentID, AutoscalingContext],
) -> tuple[dict[DeploymentID, int], dict]:
decisions = {}
worker_id = [d for d in contexts if d.name == "Worker"][0]
worker_ctx = contexts[worker_id]
print(f"AUTOSCALING_POLICY_CONTEXT: {worker_ctx=}")
# Autoscaling policy that keeps scaling worker between max and max-1
if worker_ctx.current_num_replicas == worker_ctx.capacity_adjusted_max_replicas:
decisions[worker_id] = worker_ctx.current_num_replicas - 1
else:
decisions[worker_id] = worker_ctx.capacity_adjusted_max_replicas
return decisions, {}
@serve.deployment()
class Worker:
def __init__(self):
pass
async def __call__(self, x: int) -> int:
await asyncio.sleep(1)
return x * 2
@serve.deployment()
class Caller:
def __init__(self, worker: DeploymentHandle):
self.worker = worker
asyncio.create_task(self.call())
async def call(self):
while True:
results = await asyncio.gather(*[self.worker.remote(i) for i in range(50)])
print(results)
app = Caller.bind(Worker.bind())Command
serve run serve.yaml
Output
(ServeController pid=57127) AUTOSCALING_POLICY_CONTEXT: worker_ctx=AutoscalingContext(deployment_id=Deployment(name='Worker', app='app1'), deployment_name='Worker', app_name='app1', current_num_replicas=5, target_num_replicas=5, running_replicas=[Replica(id='4z4hqyeo', deployment='Worker', app='app1'), Replica(id='p3kg4k5o', deployment='Worker', app='app1'), Replica(id='yksqairq', deployment='Worker', app='app1'), Replica(id='roxvwap8', deployment='Worker', app='app1'), Replica(id='ee70ku1b', deployment='Worker', app='app1')], total_num_requests=26.220338983050848, total_queued_requests=15.796610169491526, total_running_requests=10.423728813559322, aggregated_metrics={}, raw_metrics={}, capacity_adjusted_min_replicas=1, capacity_adjusted_max_replicas=5, policy_state={}, last_scale_up_time=None, last_scale_down_time=None, current_time=1764154179.377394, config=AutoscalingConfig(min_replicas=1, initial_replicas=None, max_replicas=5, target_ongoing_requests=2.0, metrics_interval_s=10.0, look_back_period_s=30.0, smoothing_factor=1.0, upscale_smoothing_factor=None, downscale_smoothing_factor=None, upscaling_factor=None, downscaling_factor=None, downscale_delay_s=600.0, downscale_to_zero_delay_s=None, upscale_delay_s=30.0, aggregation_function=<AggregationFunction.MEAN: 'mean'>, policy=AutoscalingPolicy(policy_function='ray.serve.autoscaling_policy:default_autoscaling_policy')))
(ServeController pid=57127) AUTOSCALING_POLICY_CONTEXT: worker_ctx=AutoscalingContext(deployment_id=Deployment(name='Worker', app='app1'), deployment_name='Worker', app_name='app1', current_num_replicas=4, target_num_replicas=4, running_replicas=[Replica(id='4z4hqyeo', deployment='Worker', app='app1'), Replica(id='p3kg4k5o', deployment='Worker', app='app1'), Replica(id='yksqairq', deployment='Worker', app='app1'), Replica(id='roxvwap8', deployment='Worker', app='app1')], total_num_requests=26.220338983050848, total_queued_requests=15.796610169491526, total_running_requests=10.423728813559322, aggregated_metrics={}, raw_metrics={}, capacity_adjusted_min_replicas=1, capacity_adjusted_max_replicas=5, policy_state={}, last_scale_up_time=None, last_scale_down_time=None, current_time=1764154179.485524, config=AutoscalingConfig(min_replicas=1, initial_replicas=None, max_replicas=5, target_ongoing_requests=2.0, metrics_interval_s=10.0, look_back_period_s=30.0, smoothing_factor=1.0, upscale_smoothing_factor=None, downscale_smoothing_factor=None, upscaling_factor=None, downscaling_factor=None, downscale_delay_s=600.0, downscale_to_zero_delay_s=None, upscale_delay_s=30.0, aggregation_function=<AggregationFunction.MEAN: 'mean'>, policy=AutoscalingPolicy(policy_function='ray.serve.autoscaling_policy:default_autoscaling_policy')))
(last_scale_down_time and last_scale_up_time are always None, even though the above two logs show different current_num_replicas, indicating that scaling indeed happened. I confirmed this on the dashboard)
Issue Severity
Medium: It is a significant difficulty but I can work around it.