Skip to content

[Data] Introduce seams to DefaultAutoscaler2 to make it more testable #59683

@bveeramani

Description

@bveeramani

The DefaultAutoscaler2 implementation needs an AutoscalingCoordinator and a way to get all of the _NodeResourceSpec.

Currently, we can't explicitly inject fake implementations of either dependency. This is problematic because the tests need to assume what the implementation of each dependency looks like and use brittle mocks.

For example, patch_autoscaling_coordinator works by patching get_or_create_autoscaling_coordinator and ray.get -- both of which are implementation details of the autoscaling coordinator:

def patch_autoscaling_coordinator():
mock_coordinator = MagicMock()
with patch(
"ray.data._internal.cluster_autoscaler.default_autoscaling_coordinator.get_or_create_autoscaling_coordinator",
return_value=mock_coordinator,
):
# Patch ray.get in the autoscaling_coordinator module to avoid issues with MagicMock ObjectRefs
with patch(
"ray.data._internal.cluster_autoscaler.default_autoscaling_coordinator.ray.get",
return_value=None,
):
yield
.

To make the dependencies explicit and easier to fake, we should introduce seams.

Outcome

  • Add the FakeAutoscalingCoordinator implementation to a new fake_autoscaling_coordinator.py module (you can use the code below)
  • DefaultClusterAutoscalerV2 has two new parameters autoscaling_coordinator: Optional[AutoscalingCoordinator] = None and get_node_counts: Callable[[], Dict[_NodeResourceSpec, int]] = get_node_resource_spec_and_count. If autoscaling_coordinator is None, you can use the default implementation.
  • Update test_try_scale_up_cluster to use the explicit seams rather than mocks. Where possible, assert against the public interface rather than implementation details
class FakeAutoscalingCoordinator(AutoscalingCoordinator):
    """A lightweight implementation for testing.

    This implementation always allocates the requested resources to the requester.
    It doesn't support the `priority` parameter.
    """

    @dataclass
    class Allocation:
        resources: List[ResourceDict]
        expiration_time_s: float
        request_remaining: bool

    def __init__(
        self,
        get_time: Callable[[], float] = time.time,
        remaining: Optional[List[ResourceDict]] = None,
    ):
        if remaining is None:
            remaining = []

        self._get_time = get_time
        self._remaining = remaining

        self._allocations: Dict[str, self.Allocation] = {}

    def request_resources(
        self,
        requester_id: str,
        resources: List[ResourceDict],
        expire_after_s: float,
        request_remaining: bool = False,
        priority: ResourceRequestPriority = ResourceRequestPriority.MEDIUM,
    ) -> None:
        if priority != ResourceRequestPriority.MEDIUM:
            raise NotImplementedError(
                "This fake implementation doesn't support the `priority` parameter."
            )

        self._allocations[requester_id] = self.Allocation(
            resources=resources,
            expiration_time_s=self._get_time() + expire_after_s,
            request_remaining=request_remaining,
        )

    def cancel_request(self, requester_id: str):
        if requester_id in self._allocations:
            del self._allocations[requester_id]

    def get_allocated_resources(self, requester_id: str) -> List[ResourceDict]:
        allocation = self._allocations.get(requester_id)

        # Case 1: The requester hasn't been allocated any resources.
        if allocation is None:
            return []

        # Case 2: The requester's allocation has expired.
        elif allocation.expiration_time_s < self._get_time():
            del self._allocations[requester_id]
            return []

        # Case 3: The requester has been allocated resources, and they haven't expired.
        else:
            allocated_resources = allocation.resources

            if allocation.request_remaining:
                allocated_resources.extend(self._remaining)

            return allocated_resources

Metadata

Metadata

Assignees

Labels

P2Important issue, but not time-criticaldataRay Data-related issues

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions