Skip to content

Refactor proxystore.connectors.dim #224

@gpauloski

Description

@gpauloski

Motivation

The implementations in proxystore.connectors.dim are similar in characteristics but differ in how they are implemented. There are also a few known bugs in some of the implementations that are artifacts of their current design (#159 and #166).

Goals

This PR proposes a refactor of the implementations to bring their design inline with each other and address the following limitations/bugs:

  • XConnectors on different processes on the same machine cannot share the same XServer because the server references are stored in global variables local to the process.
  • Multiple XServers cannot exist on the same machine because only one server handle can be stored making testing difficult. Example:
    # Note: This test will hang on its own because it doesn't call close
  • The integration tests with UCX occasionally hang.

Outline

I propose the following layout for each DIM Connector implementation:

class XKey(NamedTuple):
    object_id: str
    object_size: int
    location: str

class XConnector(DIMMixin):
    # Init first checks if a server is already running with wait_server().
    # If one is not, it will call spawn_server(). For testing, we need to be able
    # to set the timeout on wait_server() to be reasonably small.
    def __init__(self, ...) -> None:
    # _send_rpc contains the logic for serializing the RPC message, communicating
    # to the server, waiting on the response, and deserializing the response.
    # Note this method takes a list of RPCs so `get_batch`/`put_batch` can be optimized
    # or reuse a single connection.
    def _send_rpc(self, rpc: Sequence[RPC]) -> list[RPCResponse]: ...
    # By default, close() does not kill the server and we instead use atexit
    # to close the server when the main process terminates.
    # Will require modifying Store.close() to pass kwargs to the Connector.
    def close(self, kill_server: bool = False) -> None: ...
    def config(self) -> dict[str, Any]: ...
    def from_config(self, config: dict[str, Any]) -> XConnector: ...
    # The rest of the Connector protocol is added via the mixin

class XServer:
    def close(self) -> None: ...
    def evict(self, key: str) -> None: ...
    def exists(self, key: str) -> bool: ...
    def get(self, key: str) -> bytes: ...
    def set(self, key: str, obj: bytes) -> None: ...
    # Different implementations may do this different. E.g.,
    # Margo registers the above methods directly.
    def handle_rpcs(self) -> None: ...
        # deserialize message
        # execute RPC
        # return RPCResponse

def start_server(...) -> None:
    server = XServer(...)
    # Register SIGINT/SIGTERM handler
    # Run server and wait on SIGINT/SIGTERM
    server.close()

def wait_server(..., timeout: float = 0.1) -> None:
    # Waits on response from server for up to timeout seconds

def spawn_server(..., timeout: float) -> None:
    server_process = Process(target=start_server, args=...)
    server_process.start()
  
    def _kill_on_exit() -> None:
        server_process.terminate()
        server_process.join(timeout=timeout)
        if server_process.is_alive():
            server_process.kill()
            server_process.join()
   
    # Register the server process to be killed on exit of the main process
    atexit.register(_kill_on_exit)

    # Guarentee to caller the server is running when we return
    wait_server(...)

All of the implementations can share the RPC and RPCResponse types, and potential inherit many methods from a base DIMMixin.

class DIMMixin:
    # These methods converts the operation to an `RCP` message,
    # call `self._send_rpc()`, and translate the resulting `RPCResponse`
    # to the appropriate return value.
    def evict(self, key: XKey) -> None: ...
    def exists(self, key: XKey) -> bool: ...
    def get(self, key: XKey) -> bytes | None: ...
    def get_batch(self, Sequence[XKey]) -> list[bytes | None]: ...
    def put(self, obj: bytes) -> XKey: ...
    def put_batch(self, objs: Sequence[bytes]) -> list[XKey]: ...

# These are dataclasses or Pydantic BaseModels

class RPC:
    operation: Literal['exists', 'evict', 'get', 'set']
    key: str
    payload: bytes | None

class RPCResponse:
    operation: Literal['exists', 'evict', 'get', 'set']
    key: str
    result: bytes | None
    exception: Exception | None

Outcomes

How does this solve the aforementioned problems? Now that there is no singular global reference to a server that is unique to the process, we can have multiple XConnector or XServer without fear of them trying to modify what would have previously been in the global server_process.

Using atexit to ensure the server is closed when the parent process exits should also prevent random hanging. Though, in the test suite we will need to manually stop the server with Connector.close(kill_server=True) to make sure coverage works (in principle but this will need investigation).

Completion of this issue is dependent on the above changes also fixing #159 and #166.

A couple minor things to improve in this module as well:

Metadata

Metadata

Labels

enhancementNew features or improvements to existing functionalityinternalRefactoring, style changes, testing, or code optimizations

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions