Skip to content

[Serve][Autoscaling] Last scale-up and scale-down time not given for application level autoscaling policy #59001

@greenfish8090

Description

@greenfish8090

What happened + What you expected to happen

What happened

When implementing a custom application level autoscaling policy, a dict of contexts is provided, one for each deployment. Two of the fields of AutoscalingContext are last_scale_up_time and last_scale_down_time. However these are always None, even when scaling does happen.

What I expect to happen

last_scale_down_time is not None after a scale-down event and last_scale_up_time is not None after a scale-up event

Versions / Dependencies

Ray: 2.51.0

Python: 3.12

Reproduction script

serve.yaml:

# This file was generated using the `serve build` command on Ray v2.51.0.

proxy_location: EveryNode

http_options:
  host: 0.0.0.0
  port: 8000

grpc_options:
  port: 9000
  grpc_servicer_functions: []

logging_config:
  encoding: TEXT
  log_level: INFO
  logs_dir: null
  enable_access_log: true
  additional_log_standard_attrs: []

applications:
- name: app1
  route_prefix: /
  import_path: main:app
  autoscaling_policy:
    policy_function: main:coordinated_scaling_policy
  runtime_env: {}
  deployments:
  - name: Worker
    max_ongoing_requests: 4
    ray_actor_options:
      num_cpus: 0.2
    autoscaling_config:
      target_ongoing_requests: 2
      min_replicas: 1
      max_replicas: 5
  - name: Caller

main.py

import asyncio

from ray import serve
from ray.serve._private.common import DeploymentID
from ray.serve.config import AutoscalingContext
from ray.serve.handle import DeploymentHandle


def coordinated_scaling_policy(
    contexts: dict[DeploymentID, AutoscalingContext],
) -> tuple[dict[DeploymentID, int], dict]:
    decisions = {}

    worker_id = [d for d in contexts if d.name == "Worker"][0]
    worker_ctx = contexts[worker_id]
    print(f"AUTOSCALING_POLICY_CONTEXT: {worker_ctx=}")

    # Autoscaling policy that keeps scaling worker between max and max-1
    if worker_ctx.current_num_replicas == worker_ctx.capacity_adjusted_max_replicas:
        decisions[worker_id] = worker_ctx.current_num_replicas - 1
    else:
        decisions[worker_id] = worker_ctx.capacity_adjusted_max_replicas

    return decisions, {}


@serve.deployment()
class Worker:
    def __init__(self):
        pass

    async def __call__(self, x: int) -> int:
        await asyncio.sleep(1)
        return x * 2


@serve.deployment()
class Caller:
    def __init__(self, worker: DeploymentHandle):
        self.worker = worker
        asyncio.create_task(self.call())

    async def call(self):
        while True:
            results = await asyncio.gather(*[self.worker.remote(i) for i in range(50)])
            print(results)


app = Caller.bind(Worker.bind())

Command

serve run serve.yaml

Output

(ServeController pid=57127) AUTOSCALING_POLICY_CONTEXT: worker_ctx=AutoscalingContext(deployment_id=Deployment(name='Worker', app='app1'), deployment_name='Worker', app_name='app1', current_num_replicas=5, target_num_replicas=5, running_replicas=[Replica(id='4z4hqyeo', deployment='Worker', app='app1'), Replica(id='p3kg4k5o', deployment='Worker', app='app1'), Replica(id='yksqairq', deployment='Worker', app='app1'), Replica(id='roxvwap8', deployment='Worker', app='app1'), Replica(id='ee70ku1b', deployment='Worker', app='app1')], total_num_requests=26.220338983050848, total_queued_requests=15.796610169491526, total_running_requests=10.423728813559322, aggregated_metrics={}, raw_metrics={}, capacity_adjusted_min_replicas=1, capacity_adjusted_max_replicas=5, policy_state={}, last_scale_up_time=None, last_scale_down_time=None, current_time=1764154179.377394, config=AutoscalingConfig(min_replicas=1, initial_replicas=None, max_replicas=5, target_ongoing_requests=2.0, metrics_interval_s=10.0, look_back_period_s=30.0, smoothing_factor=1.0, upscale_smoothing_factor=None, downscale_smoothing_factor=None, upscaling_factor=None, downscaling_factor=None, downscale_delay_s=600.0, downscale_to_zero_delay_s=None, upscale_delay_s=30.0, aggregation_function=<AggregationFunction.MEAN: 'mean'>, policy=AutoscalingPolicy(policy_function='ray.serve.autoscaling_policy:default_autoscaling_policy')))

(ServeController pid=57127) AUTOSCALING_POLICY_CONTEXT: worker_ctx=AutoscalingContext(deployment_id=Deployment(name='Worker', app='app1'), deployment_name='Worker', app_name='app1', current_num_replicas=4, target_num_replicas=4, running_replicas=[Replica(id='4z4hqyeo', deployment='Worker', app='app1'), Replica(id='p3kg4k5o', deployment='Worker', app='app1'), Replica(id='yksqairq', deployment='Worker', app='app1'), Replica(id='roxvwap8', deployment='Worker', app='app1')], total_num_requests=26.220338983050848, total_queued_requests=15.796610169491526, total_running_requests=10.423728813559322, aggregated_metrics={}, raw_metrics={}, capacity_adjusted_min_replicas=1, capacity_adjusted_max_replicas=5, policy_state={}, last_scale_up_time=None, last_scale_down_time=None, current_time=1764154179.485524, config=AutoscalingConfig(min_replicas=1, initial_replicas=None, max_replicas=5, target_ongoing_requests=2.0, metrics_interval_s=10.0, look_back_period_s=30.0, smoothing_factor=1.0, upscale_smoothing_factor=None, downscale_smoothing_factor=None, upscaling_factor=None, downscaling_factor=None, downscale_delay_s=600.0, downscale_to_zero_delay_s=None, upscale_delay_s=30.0, aggregation_function=<AggregationFunction.MEAN: 'mean'>, policy=AutoscalingPolicy(policy_function='ray.serve.autoscaling_policy:default_autoscaling_policy')))

(last_scale_down_time and last_scale_up_time are always None, even though the above two logs show different current_num_replicas, indicating that scaling indeed happened. I confirmed this on the dashboard)

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions