-
Notifications
You must be signed in to change notification settings - Fork 16
Description
ProxyStore Stream Interfaces
Streaming data via proxies is an idea we have discussed for a while and done some prototyping on. We are starting to gather more use cases that motivate further development in this space.
Consider an application with a data generator that is streaming chunks of data (i.e., arbitrary Python objects) to a consumer which is then dispatching compute tasks which operate on a data chunk. In this scenario, the consumer does not need to have the actual chunk; rather, it only needs to know that a chunk is ready to dispatch the compute task which will actually consume the chunk. This is where a stream of proxies would be beneficial---the consumer is receiving lightweight proxies from the stream and passing those proxies along to later computation stages. The actual data chunks are only transmitted between the data generator and the process which performs some compute of the proxy of the chunk.
You could cut out the intermediary here and just start many workers that pull from the stream and perform computations as necessary, but this could be resource inefficient in more dynamic workflows the frequency of new chunks fluctuates leaving some workers idle or where workers are reallocated to different kinds of tasks. Here, streaming with proxies enables more control in the workflow scripts but without the downsides of transferring all the data through the main workflow steering process.
Previous Prototypes
There is an existing prototype for a ProxyStore streaming interface in the streaming branch.
This implementation is an abstraction on top of just a Store instance which is challenging because the Store/Connector interfaces do not have mechanisms to push events to consumers. To get around this limitation, the ProxyStreamConsumer uses a linked list of object keys such that each object stored also has the key of the next object in the stream. When client code requests the next object from the stream, the ProxyStreamConsumer sits in a while True loop until an object associated with the "next" key is available.
As a result, we are limited to one producer and one consumer, and the consumer blocks the entire execution thread polling for the next object to be available.
A couple other notes:
- The
ProxyStreamConsumeralso yields objects directly, but we are probably more interested in yielding proxies to get the pass-by-reference benefits (or supporting both options which is maybe my preference). - The implementation modified the
Connectorinterface to support the linked list of keys which won't be needed anymore with this new design.
New Design
Based on what we learned from the previous prototype and potential needs of our applications, we know we need the ability to push events to subscribers (a limitation of the Connector interface). That leads me to think we should separate the mechanism that transmits the "proxy is available" events from the mechanism which stores the data associated with the proxy.
What I propose is adjusting the prototype ProxyStreamProducer/ProxyStreamConsumer to use a PubSub interface for communicating the "proxy is available" events.
We can have StreamProducer and StreamConsumer (dropping the Proxy from the name for brevity) consumer classes initialized with a Publisher and Subscriber instance, respectively. Both the StreamProducer and StreamConsumer are both still initialized with a Store[Connector] instance. The Publisher and Subscriber interfaces abstract any event message system (e.g., Kafka, Redis PubSub, Google Cloud PubSub), and are used by the StreamProducer and StreamConsumer to propagate event metadata. The actual object data associated with the events is still stored/communicated via the Store[Connector] interface.
The benefit of this separation of concerns (event streaming vs object storage) is that each part can be independently optimized. The PubSub system can be optimized for low latency while the data storage can be optimized for location/bandwidth/storage capacity etc. This also means that we maintain the benefits of bulk data transfer only occurring between the source and true destination (the process which resolves the proxy).
API
class StreamProducer(Generic[Store[ConnectorT], Publisher]):
def __init__(self, store: Store[ConnectorT], publisher: Publisher, ...) -> None: ...
def close(self) -> None: ...
def send(self, obj: Any, ...) -> None: ...
class StreamConsumer(Generic[Store[ConnectorT], Subscriber]):
def __init__(self, store: Store[ConnectorT], subscriber: Subscriber, ...) -> None: ...
def __iter__(self) -> Self: ...
def __next__(self) -> Proxy[Any]: ...
def close(self) -> None: ...
class Publisher(Protocol):
def close(self) -> None: ...
def send(self, message: str) -> None: ...
class Subscriber(Protocol):
def close(self) -> None: ...
def recv(self) -> str: ...All of the above can be implemented as context managers as well.
It should be fairly trivial to then implement a KafkaPublisher/KafkaSubscriber, RedisPublisher/RedisSubscriber, etc.
We should also consider how to handle "topics". The Publisher could have a default topic and a topic parameter on send which is set to the default. Then Subscriber is only subscribed to one topic at a time.
Example
Producer
from proxystore.connector.redis import RedisConnector
from proxystore.store import Store
from proxystore.stream import StreamProducer
from proxystore.pubsub.redis import RedisPublisher
store = Store('redis', RedisConnector(...))
stream = StreamProducer(store, RedisPublisher(...))
for obj in ...:
stream.send(obj)Consumer
from proxystore.connector.redis import RedisConnector
from proxystore.store import Store
from proxystore.stream import StreamConsumer
from proxystore.pubsub.redis import RedisSubscriber
store = Store('redis', RedisConnector(...))
stream = StreamConsumer(store, RedisSubscriber(...))
for obj in stream:
assert isinstance(obj, Proxy)