-
Notifications
You must be signed in to change notification settings - Fork 16
Description
Motivation
The implementations in proxystore.connectors.dim are similar in characteristics but differ in how they are implemented. There are also a few known bugs in some of the implementations that are artifacts of their current design (#159 and #166).
Goals
This PR proposes a refactor of the implementations to bring their design inline with each other and address the following limitations/bugs:
XConnectors on different processes on the same machine cannot share the sameXServerbecause the server references are stored in global variables local to the process.- Multiple
XServers cannot exist on the same machine because only one server handle can be stored making testing difficult. Example:# Note: This test will hang on its own because it doesn't call close - The integration tests with UCX occasionally hang.
Outline
I propose the following layout for each DIM Connector implementation:
class XKey(NamedTuple):
object_id: str
object_size: int
location: str
class XConnector(DIMMixin):
# Init first checks if a server is already running with wait_server().
# If one is not, it will call spawn_server(). For testing, we need to be able
# to set the timeout on wait_server() to be reasonably small.
def __init__(self, ...) -> None:
# _send_rpc contains the logic for serializing the RPC message, communicating
# to the server, waiting on the response, and deserializing the response.
# Note this method takes a list of RPCs so `get_batch`/`put_batch` can be optimized
# or reuse a single connection.
def _send_rpc(self, rpc: Sequence[RPC]) -> list[RPCResponse]: ...
# By default, close() does not kill the server and we instead use atexit
# to close the server when the main process terminates.
# Will require modifying Store.close() to pass kwargs to the Connector.
def close(self, kill_server: bool = False) -> None: ...
def config(self) -> dict[str, Any]: ...
def from_config(self, config: dict[str, Any]) -> XConnector: ...
# The rest of the Connector protocol is added via the mixin
class XServer:
def close(self) -> None: ...
def evict(self, key: str) -> None: ...
def exists(self, key: str) -> bool: ...
def get(self, key: str) -> bytes: ...
def set(self, key: str, obj: bytes) -> None: ...
# Different implementations may do this different. E.g.,
# Margo registers the above methods directly.
def handle_rpcs(self) -> None: ...
# deserialize message
# execute RPC
# return RPCResponse
def start_server(...) -> None:
server = XServer(...)
# Register SIGINT/SIGTERM handler
# Run server and wait on SIGINT/SIGTERM
server.close()
def wait_server(..., timeout: float = 0.1) -> None:
# Waits on response from server for up to timeout seconds
def spawn_server(..., timeout: float) -> None:
server_process = Process(target=start_server, args=...)
server_process.start()
def _kill_on_exit() -> None:
server_process.terminate()
server_process.join(timeout=timeout)
if server_process.is_alive():
server_process.kill()
server_process.join()
# Register the server process to be killed on exit of the main process
atexit.register(_kill_on_exit)
# Guarentee to caller the server is running when we return
wait_server(...)All of the implementations can share the RPC and RPCResponse types, and potential inherit many methods from a base DIMMixin.
class DIMMixin:
# These methods converts the operation to an `RCP` message,
# call `self._send_rpc()`, and translate the resulting `RPCResponse`
# to the appropriate return value.
def evict(self, key: XKey) -> None: ...
def exists(self, key: XKey) -> bool: ...
def get(self, key: XKey) -> bytes | None: ...
def get_batch(self, Sequence[XKey]) -> list[bytes | None]: ...
def put(self, obj: bytes) -> XKey: ...
def put_batch(self, objs: Sequence[bytes]) -> list[XKey]: ...
# These are dataclasses or Pydantic BaseModels
class RPC:
operation: Literal['exists', 'evict', 'get', 'set']
key: str
payload: bytes | None
class RPCResponse:
operation: Literal['exists', 'evict', 'get', 'set']
key: str
result: bytes | None
exception: Exception | NoneOutcomes
How does this solve the aforementioned problems? Now that there is no singular global reference to a server that is unique to the process, we can have multiple XConnector or XServer without fear of them trying to modify what would have previously been in the global server_process.
Using atexit to ensure the server is closed when the parent process exits should also prevent random hanging. Though, in the test suite we will need to manually stop the server with Connector.close(kill_server=True) to make sure coverage works (in principle but this will need investigation).
Completion of this issue is dependent on the above changes also fixing #159 and #166.
A couple minor things to improve in this module as well:
- We should also clean up the logging here as I excluded this module from the PR that closed Comprehensive/better (DEBUG level) logging #124.
- Add
__repr__to each connector similar to the others.