Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# isort: off
from .scaling_policy import ScalingDecision, ScalingPolicy, NoopDecision, ResizeDecision
from .scaling_policy import (
AUTOSCALING_REQUESTS_EXPIRE_TIME_S,
AUTOSCALING_REQUESTS_GET_TIMEOUT_S,
AUTOSCALING_REQUESTS_INTERVAL_S,
)
from .elastic import ElasticScalingPolicy
from .fixed import FixedScalingPolicy
from .factory import create_scaling_policy
Expand All @@ -8,6 +13,9 @@


__all__ = [
"AUTOSCALING_REQUESTS_EXPIRE_TIME_S",
"AUTOSCALING_REQUESTS_GET_TIMEOUT_S",
"AUTOSCALING_REQUESTS_INTERVAL_S",
"ScalingPolicy",
"ElasticScalingPolicy",
"FixedScalingPolicy",
Expand Down
107 changes: 7 additions & 100 deletions python/ray/train/v2/_internal/execution/scaling_policy/elastic.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import logging
import uuid
from functools import cached_property
from typing import TYPE_CHECKING, Dict, List, Optional

import ray
from ray.train.v2._internal.execution.context import TrainRunContext
from ray.train.v2._internal.execution.scaling_policy import (
NoopDecision,
ResizeDecision,
ScalingDecision,
ScalingPolicy,
)
from ray.train.v2._internal.execution.scaling_policy.scaling_policy import (
AUTOSCALING_REQUESTS_GET_TIMEOUT_S,
)
from ray.train.v2._internal.execution.worker_group import (
WorkerGroupPollStatus,
WorkerGroupState,
Expand All @@ -28,12 +28,6 @@

class ElasticScalingPolicy(ScalingPolicy):

# The time in seconds after which an autoscaling request will expire.
AUTOSCALING_REQUESTS_EXPIRE_TIME_S = 180
# Minimum interval in seconds between two consecutive autoscaling requests.
AUTOSCALING_REQUESTS_INTERVAL_S = 20
# Timeout in seconds for getting the result of a call to the AutoscalingCoordinator.
AUTOSCALING_REQUESTS_GET_TIMEOUT_S = 5
# Minimum interval in seconds between querying the AutoscalingCoordinator for allocated resources.
GET_ALLOCATED_RESOURCES_INTERVAL_S = 1
# Minimum interval in seconds between logging warnings about insufficient workers.
Expand All @@ -43,14 +37,13 @@ def __init__(self, scaling_config: ScalingConfig):
super().__init__(scaling_config)

self._latest_monitor_time = float("-inf")
# Requester ID for AutoscalingCoordinator.
# Prefer the Train run_id when available (set in after_controller_start).
self._requester_id = "train-" + uuid.uuid4().hex
self._latest_autoscaling_request_time = float("-inf")
self._latest_insufficient_workers_warning_time = float("-inf")
self._latest_allocated_resources_query_time = float("-inf")
self._latest_allocated_resources: Optional[List["ResourceDict"]] = None

def _get_num_workers_for_resource_request(self) -> int:
return self.scaling_config.max_workers

def _count_possible_workers(
self, allocated_resources: List[Dict[str, float]]
) -> int:
Expand Down Expand Up @@ -180,50 +173,6 @@ def make_decision_for_running_worker_group(
# Methods for interacting with AutoscalingCoordinator
# ---------------------------------------------------

@cached_property
def _autoscaling_coordinator(self):
from ray.data._internal.cluster_autoscaler.default_autoscaling_coordinator import (
get_or_create_autoscaling_coordinator,
)

return get_or_create_autoscaling_coordinator()

def _maybe_send_resource_request(self):
"""Send a resource request to AutoscalingCoordinator,
if AUTOSCALING_REQUESTS_INTERVAL_S has passed since the last send."""
now = time_monotonic()
if (
now - self._latest_autoscaling_request_time
< self.AUTOSCALING_REQUESTS_INTERVAL_S
):
return

resources_per_worker = self.scaling_config._resources_per_worker_not_none
max_workers = self.scaling_config.max_workers
try:
from ray.data._internal.cluster_autoscaler.default_autoscaling_coordinator import (
ResourceRequestPriority,
)

ray.get(
self._autoscaling_coordinator.request_resources.remote(
requester_id=self._requester_id,
resources=[resources_per_worker] * max_workers,
expire_after_s=self.AUTOSCALING_REQUESTS_EXPIRE_TIME_S,
priority=ResourceRequestPriority.HIGH,
),
timeout=self.AUTOSCALING_REQUESTS_GET_TIMEOUT_S,
)
self._latest_autoscaling_request_time = time_monotonic()
except Exception:
msg = (
f"Failed to send resource request for {self._requester_id}."
" If this only happens transiently during network partition or"
" CPU being overloaded, it's safe to ignore this error."
" If this error persists, file a GitHub issue."
)
logger.warning(msg, exc_info=True)

def _get_allocated_resources(self) -> Optional[List["ResourceDict"]]:
"""Get allocated resources from AutoscalingCoordinator.
Return None if there is an error."""
Expand All @@ -239,7 +188,7 @@ def _get_allocated_resources(self) -> Optional[List["ResourceDict"]]:
self._autoscaling_coordinator.get_allocated_resources.remote(
self._requester_id
),
timeout=self.AUTOSCALING_REQUESTS_GET_TIMEOUT_S,
timeout=AUTOSCALING_REQUESTS_GET_TIMEOUT_S,
)
except Exception:
msg = (
Expand All @@ -255,45 +204,3 @@ def _get_allocated_resources(self) -> Optional[List["ResourceDict"]]:
self._latest_allocated_resources = allocated_resources

return self._latest_allocated_resources

def _cancel_resource_request(self):
"""Cancel the resource request to AutoscalingCoordinator."""
try:
ray.get(
self._autoscaling_coordinator.cancel_request.remote(
requester_id=self._requester_id,
),
timeout=self.AUTOSCALING_REQUESTS_GET_TIMEOUT_S,
)
except Exception:
msg = (
f"Failed to cancel resource request for {self._requester_id}."
" The request will still expire after the timeout of"
f" {self.AUTOSCALING_REQUESTS_EXPIRE_TIME_S} seconds."
)
logger.warning(msg, exc_info=True)

# --------------------------
# ControllerCallback
# --------------------------

def after_controller_start(self, train_run_context: TrainRunContext):
"""Send cluster autoscaling requests when the control loop starts."""
self._requester_id = f"train-{train_run_context.run_id}"
resources_per_worker = self.scaling_config._resources_per_worker_not_none
max_workers = self.scaling_config.max_workers
logger.info(
"Requesting resources to fit the maximum number of workers: "
f"{resources_per_worker} * {max_workers}"
)
self._maybe_send_resource_request()

def before_controller_shutdown(self):
"""Clear the autoscaling request eagerly when the control loop shuts down.
So that cluster can scale down more quickly before the request timeout.
"""
self._cancel_resource_request()

def before_controller_abort(self):
"""Cancel the autoscaling request when the controller is aborted."""
self._cancel_resource_request()
113 changes: 2 additions & 111 deletions python/ray/train/v2/_internal/execution/scaling_policy/fixed.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
import logging
import uuid
from functools import cached_property

import ray
from ray.train.v2._internal.execution.context import TrainRunContext
from ray.train.v2._internal.execution.scaling_policy import (
NoopDecision,
ResizeDecision,
Expand All @@ -14,23 +8,11 @@
WorkerGroupPollStatus,
WorkerGroupState,
)
from ray.train.v2._internal.util import time_monotonic

logger = logging.getLogger(__name__)

# The time in seconds after which an autoscaling request will expire.
AUTOSCALING_REQUESTS_EXPIRE_TIME_S = 180
# Timeout in seconds for getting the result of a call to the AutoscalingCoordinator.
AUTOSCALING_REQUESTS_GET_TIMEOUT_S = 5
# Interval in seconds between resource requests to the AutoscalingCoordinator.
AUTOSCALING_REQUESTS_INTERVAL_S = 20


class FixedScalingPolicy(ScalingPolicy):
def __init__(self, scaling_config):
super().__init__(scaling_config)
self._requester_id = "train-" + uuid.uuid4().hex
self._latest_autoscaling_request_time = float("-inf")
def _get_num_workers_for_resource_request(self) -> int:
return self.scaling_config.num_workers

def make_decision_for_non_running_worker_group(self) -> ScalingDecision:
self._maybe_send_resource_request()
Expand All @@ -46,94 +28,3 @@ def make_decision_for_running_worker_group(
) -> ScalingDecision:
self._maybe_send_resource_request()
return NoopDecision()

# ---------------------------------------------------
# Methods for interacting with AutoscalingCoordinator
# ---------------------------------------------------

@cached_property
def _autoscaling_coordinator(self):
from ray.data._internal.cluster_autoscaler.default_autoscaling_coordinator import (
get_or_create_autoscaling_coordinator,
)

return get_or_create_autoscaling_coordinator()

def _maybe_send_resource_request(self):
"""Send a resource request to AutoscalingCoordinator,
if AUTOSCALING_REQUESTS_INTERVAL_S has passed since the last send."""
now = time_monotonic()
if (
now - self._latest_autoscaling_request_time
< AUTOSCALING_REQUESTS_INTERVAL_S
):
return
self._send_resource_request()

def _send_resource_request(self):
"""Register training resources with the AutoscalingCoordinator."""
resources_per_worker = self.scaling_config._resources_per_worker_not_none
num_workers = self.scaling_config.num_workers
try:
from ray.data._internal.cluster_autoscaler.default_autoscaling_coordinator import (
ResourceRequestPriority,
)

ray.get(
self._autoscaling_coordinator.request_resources.remote(
requester_id=self._requester_id,
resources=[resources_per_worker] * num_workers,
expire_after_s=AUTOSCALING_REQUESTS_EXPIRE_TIME_S,
priority=ResourceRequestPriority.HIGH,
),
timeout=AUTOSCALING_REQUESTS_GET_TIMEOUT_S,
)
self._latest_autoscaling_request_time = time_monotonic()
except Exception:
msg = (
f"Failed to send resource request for {self._requester_id}."
" If this only happens transiently during network partition or"
" CPU being overloaded, it's safe to ignore this error."
" If this error persists, file a GitHub issue."
)
logger.warning(msg, exc_info=True)

def _cancel_resource_request(self):
"""Cancel the resource request to AutoscalingCoordinator."""
try:
ray.get(
self._autoscaling_coordinator.cancel_request.remote(
requester_id=self._requester_id,
),
timeout=AUTOSCALING_REQUESTS_GET_TIMEOUT_S,
)
except Exception:
msg = (
f"Failed to cancel resource request for {self._requester_id}."
" The request will still expire after the timeout of"
f" {AUTOSCALING_REQUESTS_EXPIRE_TIME_S} seconds."
)
logger.warning(msg, exc_info=True)

# --------------------------
# ControllerCallback
# --------------------------

def after_controller_start(self, train_run_context: TrainRunContext):
"""Register training resources with the AutoscalingCoordinator."""
self._requester_id = f"train-{train_run_context.run_id}"
resources_per_worker = self.scaling_config._resources_per_worker_not_none
num_workers = self.scaling_config.num_workers
logger.info(
"Requesting resources for fixed worker group: "
f"{resources_per_worker} * {num_workers}"
)
self._send_resource_request()

def before_controller_shutdown(self):
"""Cancel the resource request when the controller shuts down."""
self._cancel_resource_request()

def before_controller_abort(self):
"""Cancel the resource request when the controller is aborted."""
self._cancel_resource_request()
Loading