-
Notifications
You must be signed in to change notification settings - Fork 7.4k
Closed
Labels
P2Important issue, but not time-criticalImportant issue, but not time-criticaldataRay Data-related issuesRay Data-related issues
Description
The DefaultAutoscaler2 implementation needs an AutoscalingCoordinator and a way to get all of the _NodeResourceSpec.
Currently, we can't explicitly inject fake implementations of either dependency. This is problematic because the tests need to assume what the implementation of each dependency looks like and use brittle mocks.
For example, patch_autoscaling_coordinator works by patching get_or_create_autoscaling_coordinator and ray.get -- both of which are implementation details of the autoscaling coordinator:
ray/python/ray/data/tests/test_default_cluster_autoscaler_v2.py
Lines 28 to 41 in 6dc66d4
| def patch_autoscaling_coordinator(): | |
| mock_coordinator = MagicMock() | |
| with patch( | |
| "ray.data._internal.cluster_autoscaler.default_autoscaling_coordinator.get_or_create_autoscaling_coordinator", | |
| return_value=mock_coordinator, | |
| ): | |
| # Patch ray.get in the autoscaling_coordinator module to avoid issues with MagicMock ObjectRefs | |
| with patch( | |
| "ray.data._internal.cluster_autoscaler.default_autoscaling_coordinator.ray.get", | |
| return_value=None, | |
| ): | |
| yield | |
To make the dependencies explicit and easier to fake, we should introduce seams.
Outcome
- Add the
FakeAutoscalingCoordinatorimplementation to a newfake_autoscaling_coordinator.pymodule (you can use the code below) DefaultClusterAutoscalerV2has two new parametersautoscaling_coordinator: Optional[AutoscalingCoordinator] = Noneandget_node_counts: Callable[[], Dict[_NodeResourceSpec, int]] = get_node_resource_spec_and_count. Ifautoscaling_coordinatoris None, you can use the default implementation.- Update
test_try_scale_up_clusterto use the explicit seams rather than mocks. Where possible, assert against the public interface rather than implementation details
class FakeAutoscalingCoordinator(AutoscalingCoordinator):
"""A lightweight implementation for testing.
This implementation always allocates the requested resources to the requester.
It doesn't support the `priority` parameter.
"""
@dataclass
class Allocation:
resources: List[ResourceDict]
expiration_time_s: float
request_remaining: bool
def __init__(
self,
get_time: Callable[[], float] = time.time,
remaining: Optional[List[ResourceDict]] = None,
):
if remaining is None:
remaining = []
self._get_time = get_time
self._remaining = remaining
self._allocations: Dict[str, self.Allocation] = {}
def request_resources(
self,
requester_id: str,
resources: List[ResourceDict],
expire_after_s: float,
request_remaining: bool = False,
priority: ResourceRequestPriority = ResourceRequestPriority.MEDIUM,
) -> None:
if priority != ResourceRequestPriority.MEDIUM:
raise NotImplementedError(
"This fake implementation doesn't support the `priority` parameter."
)
self._allocations[requester_id] = self.Allocation(
resources=resources,
expiration_time_s=self._get_time() + expire_after_s,
request_remaining=request_remaining,
)
def cancel_request(self, requester_id: str):
if requester_id in self._allocations:
del self._allocations[requester_id]
def get_allocated_resources(self, requester_id: str) -> List[ResourceDict]:
allocation = self._allocations.get(requester_id)
# Case 1: The requester hasn't been allocated any resources.
if allocation is None:
return []
# Case 2: The requester's allocation has expired.
elif allocation.expiration_time_s < self._get_time():
del self._allocations[requester_id]
return []
# Case 3: The requester has been allocated resources, and they haven't expired.
else:
allocated_resources = allocation.resources
if allocation.request_remaining:
allocated_resources.extend(self._remaining)
return allocated_resources
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
P2Important issue, but not time-criticalImportant issue, but not time-criticaldataRay Data-related issuesRay Data-related issues