[train] Register training resources with AutoscalingCoordinator in FixedScalingPolicy#61703
Conversation
…inator request in FixedScalingConfig Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request correctly addresses a bug in FixedScalingPolicy where resource calculations could result in negative values with the V2 autoscaler. The solution, which involves registering training resources with the AutoscalingCoordinator, aligns well with existing patterns in ElasticScalingPolicy and is a robust fix. The accompanying tests are thorough, covering the new logic, backward compatibility for the V1 autoscaler, and the specific regression case. My only suggestion is to refine the exception handling in fixed.py to be more specific, which will improve the code's robustness.
python/ray/train/v2/_internal/execution/scaling_policy/fixed.py
Outdated
Show resolved
Hide resolved
python/ray/train/v2/_internal/execution/scaling_policy/fixed.py
Outdated
Show resolved
Hide resolved
python/ray/train/v2/_internal/execution/scaling_policy/fixed.py
Outdated
Show resolved
Hide resolved
python/ray/train/v2/_internal/execution/scaling_policy/fixed.py
Outdated
Show resolved
Hide resolved
Ensures the resource request to the AutoscalingCoordinator is kept alive even while the worker group is not yet running, preventing the 180s expiry from dropping the reservation.
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
There was a problem hiding this comment.
Is this the exact same code as in ElasticScalingPolicy? Can you move shared logic into a common utility class and/or into the ScalingPolicy class?
There was a problem hiding this comment.
Signed-off-by: Matthew Deng <matthew.j.deng@gmail.com>
Merging pull up refactor between fixed and elastic scaling policies. This change: Integrates the Ray Data Autoscaling Coordinator as a default component of scaling policies Sets default Scaling Policy lifecycles to request resources after controller start and to cancel requests on abort and shutdown Creates a _get_num_workers_for_resource_request abstract method that standardizes the num workers for every request made. Pulls up common timeout and expiry constants When adding new scaling policies in the future, we should revisit these changes- particularly if: Autoscaling Coordinator integration is not necessary or a different interface becomes available Timeout and expiry constants diverge _get_num_workers_for_resource_request needs to accommodate variable num worker requests Controller lifecycle behavior diverges Will add comments in code for future maintenance
python/ray/train/v2/_internal/execution/scaling_policy/scaling_policy.py
Show resolved
Hide resolved
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
…xedScalingPolicy (ray-project#61703) ## Summary The V2 cluster autoscaler divides cluster resources equally among registered data executors via the `AutoscalingCoordinator`. Previously, `DataConfig.configure()` added training resources to each dataset's `exclude_resources`, which `ResourceManager.get_global_limits()` subtracted from total resources. Under V2, this returns only the executor's allocated share (cluster / num_executors), so subtracting training resources could yield negative limits, causing assertion failures. This was the root cause of the test regression fixed in ray-project#61640, and should also help deflake the `pytorch:torch_detection` tests ([buildkite](https://buildkite.com/ray-project/postmerge/builds/16449#019ce823-a291-474d-823d-adaf6c623ebb)). ## Changes - **Pull up `AutoscalingCoordinator` integration into base `ScalingPolicy`**: `ElasticScalingPolicy` already registered training resources with the coordinator. This PR moves that logic into the base `ScalingPolicy` class so `FixedScalingPolicy` gets the same behavior: - `_maybe_send_resource_request()`, `_send_resource_request()`, `_cancel_resource_request()` defined once in the base class - Shared constants: `AUTOSCALING_REQUESTS_EXPIRE_TIME_S=180`, `AUTOSCALING_REQUESTS_INTERVAL_S=20`, `AUTOSCALING_REQUESTS_GET_TIMEOUT_S=5` - Default `ControllerCallback` lifecycle: `after_controller_start` registers resources, `before_controller_shutdown`/`before_controller_abort` cancel requests - Subclasses implement `_get_num_workers_for_resource_request()` to return their worker count (`num_workers` for Fixed, `max_workers` for Elastic) - **Gate `exclude_resources` on V1 autoscaler**: `DataConfig.configure()` no longer adds training resources to `exclude_resources` under V2 (default). A new `_is_v2_autoscaler()` helper controls this. Under V1 (`RAY_DATA_CLUSTER_AUTOSCALER=V1`), the old path is preserved. ### Considerations when adding new scaling policies When adding new scaling policies, revisit the shared base class defaults if: 1. AutoscalingCoordinator integration is not necessary or a different interface becomes available 2. Timeout and expiry constants need to diverge between policies 3. `_get_num_workers_for_resource_request()` needs to accommodate variable worker counts beyond a simple fixed/max value 4. Controller lifecycle behavior diverges ## Tests - `test_datasets_callback` — updated to assert `exclude_resources == zero()` under V2 - `test_data_config_exclude_resources` — updated to expect only user-set `exclude_resources` - `test_exclude_train_resources_applies_to_each_dataset` — updated to only assert user-defined per-dataset values - `test_datasets_callback_v1_uses_exclude_resources` — **new**: verifies `exclude_resources` is still set under V1 - `test_v2_no_negative_exclude_resources` — **new**: regression test verifying the scenario that previously caused negative CPU limits no longer fails - `test_fixed_scaling_policy_coordinator_lifecycle` — **new**: verifies `FixedScalingPolicy` registers resources on start, re-requests periodically, and cancels on shutdown/abort - `test_data_config_default_resource_limits` / `_run_data_config_resource_test` — updated to reflect V2 behavior - `test_elastic_scaling_policy.py` — updated imports to use shared constants from base module --------- Signed-off-by: JasonLi1909 <jasli1909@gmail.com> Signed-off-by: Matthew Deng <matthew.j.deng@gmail.com> Co-authored-by: Matthew Deng <matthew.j.deng@gmail.com> Signed-off-by: Pedro Jeronimo <pedro.jeronimo@tecnico.ulisboa.pt>
…rdinator changes (#61990) After #61703, Ray Train no longer adds training resources to `exclude_resources`. Instead, training resources are registered directly with the `AutoscalingCoordinator`. The docs still said training resources are "automatically excluded," which is misleading. This updates the wording to reflect the new behavior. Signed-off-by: JasonLi1909 <jasli1909@gmail.com>
…xedScalingPolicy (ray-project#61703) ## Summary The V2 cluster autoscaler divides cluster resources equally among registered data executors via the `AutoscalingCoordinator`. Previously, `DataConfig.configure()` added training resources to each dataset's `exclude_resources`, which `ResourceManager.get_global_limits()` subtracted from total resources. Under V2, this returns only the executor's allocated share (cluster / num_executors), so subtracting training resources could yield negative limits, causing assertion failures. This was the root cause of the test regression fixed in ray-project#61640, and should also help deflake the `pytorch:torch_detection` tests ([buildkite](https://buildkite.com/ray-project/postmerge/builds/16449#019ce823-a291-474d-823d-adaf6c623ebb)). ## Changes - **Pull up `AutoscalingCoordinator` integration into base `ScalingPolicy`**: `ElasticScalingPolicy` already registered training resources with the coordinator. This PR moves that logic into the base `ScalingPolicy` class so `FixedScalingPolicy` gets the same behavior: - `_maybe_send_resource_request()`, `_send_resource_request()`, `_cancel_resource_request()` defined once in the base class - Shared constants: `AUTOSCALING_REQUESTS_EXPIRE_TIME_S=180`, `AUTOSCALING_REQUESTS_INTERVAL_S=20`, `AUTOSCALING_REQUESTS_GET_TIMEOUT_S=5` - Default `ControllerCallback` lifecycle: `after_controller_start` registers resources, `before_controller_shutdown`/`before_controller_abort` cancel requests - Subclasses implement `_get_num_workers_for_resource_request()` to return their worker count (`num_workers` for Fixed, `max_workers` for Elastic) - **Gate `exclude_resources` on V1 autoscaler**: `DataConfig.configure()` no longer adds training resources to `exclude_resources` under V2 (default). A new `_is_v2_autoscaler()` helper controls this. Under V1 (`RAY_DATA_CLUSTER_AUTOSCALER=V1`), the old path is preserved. ### Considerations when adding new scaling policies When adding new scaling policies, revisit the shared base class defaults if: 1. AutoscalingCoordinator integration is not necessary or a different interface becomes available 2. Timeout and expiry constants need to diverge between policies 3. `_get_num_workers_for_resource_request()` needs to accommodate variable worker counts beyond a simple fixed/max value 4. Controller lifecycle behavior diverges ## Tests - `test_datasets_callback` — updated to assert `exclude_resources == zero()` under V2 - `test_data_config_exclude_resources` — updated to expect only user-set `exclude_resources` - `test_exclude_train_resources_applies_to_each_dataset` — updated to only assert user-defined per-dataset values - `test_datasets_callback_v1_uses_exclude_resources` — **new**: verifies `exclude_resources` is still set under V1 - `test_v2_no_negative_exclude_resources` — **new**: regression test verifying the scenario that previously caused negative CPU limits no longer fails - `test_fixed_scaling_policy_coordinator_lifecycle` — **new**: verifies `FixedScalingPolicy` registers resources on start, re-requests periodically, and cancels on shutdown/abort - `test_data_config_default_resource_limits` / `_run_data_config_resource_test` — updated to reflect V2 behavior - `test_elastic_scaling_policy.py` — updated imports to use shared constants from base module --------- Signed-off-by: JasonLi1909 <jasli1909@gmail.com> Signed-off-by: Matthew Deng <matthew.j.deng@gmail.com> Co-authored-by: Matthew Deng <matthew.j.deng@gmail.com>
…rdinator changes (ray-project#61990) After ray-project#61703, Ray Train no longer adds training resources to `exclude_resources`. Instead, training resources are registered directly with the `AutoscalingCoordinator`. The docs still said training resources are "automatically excluded," which is misleading. This updates the wording to reflect the new behavior. Signed-off-by: JasonLi1909 <jasli1909@gmail.com>

Summary
The V2 cluster autoscaler divides cluster resources equally among registered data executors via the
AutoscalingCoordinator. Previously,DataConfig.configure()added training resources to each dataset'sexclude_resources, whichResourceManager.get_global_limits()subtracted from total resources. Under V2, this returns only the executor's allocated share (cluster / num_executors), so subtracting training resources could yield negative limits, causing assertion failures. This was the root cause of the test regression fixed in #61640, and should also help deflake thepytorch:torch_detectiontests (buildkite).Changes
AutoscalingCoordinatorintegration into baseScalingPolicy:ElasticScalingPolicyalready registered training resources with the coordinator. This PR moves that logic into the baseScalingPolicyclass soFixedScalingPolicygets the same behavior:_maybe_send_resource_request(),_send_resource_request(),_cancel_resource_request()defined once in the base classAUTOSCALING_REQUESTS_EXPIRE_TIME_S=180,AUTOSCALING_REQUESTS_INTERVAL_S=20,AUTOSCALING_REQUESTS_GET_TIMEOUT_S=5ControllerCallbacklifecycle:after_controller_startregisters resources,before_controller_shutdown/before_controller_abortcancel requests_get_num_workers_for_resource_request()to return their worker count (num_workersfor Fixed,max_workersfor Elastic)exclude_resourceson V1 autoscaler:DataConfig.configure()no longer adds training resources toexclude_resourcesunder V2 (default). A new_is_v2_autoscaler()helper controls this. Under V1 (RAY_DATA_CLUSTER_AUTOSCALER=V1), the old path is preserved.Considerations when adding new scaling policies
When adding new scaling policies, revisit the shared base class defaults if:
_get_num_workers_for_resource_request()needs to accommodate variable worker counts beyond a simple fixed/max valueTests
test_datasets_callback— updated to assertexclude_resources == zero()under V2test_data_config_exclude_resources— updated to expect only user-setexclude_resourcestest_exclude_train_resources_applies_to_each_dataset— updated to only assert user-defined per-dataset valuestest_datasets_callback_v1_uses_exclude_resources— new: verifiesexclude_resourcesis still set under V1test_v2_no_negative_exclude_resources— new: regression test verifying the scenario that previously caused negative CPU limits no longer failstest_fixed_scaling_policy_coordinator_lifecycle— new: verifiesFixedScalingPolicyregisters resources on start, re-requests periodically, and cancels on shutdown/aborttest_data_config_default_resource_limits/_run_data_config_resource_test— updated to reflect V2 behaviortest_elastic_scaling_policy.py— updated imports to use shared constants from base module