Skip to content

Finish the streaming interface implementation #442

@gpauloski

Description

@gpauloski

ProxyStore Stream Interfaces

Streaming data via proxies is an idea we have discussed for a while and done some prototyping on. We are starting to gather more use cases that motivate further development in this space.

Consider an application with a data generator that is streaming chunks of data (i.e., arbitrary Python objects) to a consumer which is then dispatching compute tasks which operate on a data chunk. In this scenario, the consumer does not need to have the actual chunk; rather, it only needs to know that a chunk is ready to dispatch the compute task which will actually consume the chunk. This is where a stream of proxies would be beneficial---the consumer is receiving lightweight proxies from the stream and passing those proxies along to later computation stages. The actual data chunks are only transmitted between the data generator and the process which performs some compute of the proxy of the chunk.

You could cut out the intermediary here and just start many workers that pull from the stream and perform computations as necessary, but this could be resource inefficient in more dynamic workflows the frequency of new chunks fluctuates leaving some workers idle or where workers are reallocated to different kinds of tasks. Here, streaming with proxies enables more control in the workflow scripts but without the downsides of transferring all the data through the main workflow steering process.

Previous Prototypes

There is an existing prototype for a ProxyStore streaming interface in the streaming branch.

This implementation is an abstraction on top of just a Store instance which is challenging because the Store/Connector interfaces do not have mechanisms to push events to consumers. To get around this limitation, the ProxyStreamConsumer uses a linked list of object keys such that each object stored also has the key of the next object in the stream. When client code requests the next object from the stream, the ProxyStreamConsumer sits in a while True loop until an object associated with the "next" key is available.

As a result, we are limited to one producer and one consumer, and the consumer blocks the entire execution thread polling for the next object to be available.

A couple other notes:

  • The ProxyStreamConsumer also yields objects directly, but we are probably more interested in yielding proxies to get the pass-by-reference benefits (or supporting both options which is maybe my preference).
  • The implementation modified the Connector interface to support the linked list of keys which won't be needed anymore with this new design.

New Design

Based on what we learned from the previous prototype and potential needs of our applications, we know we need the ability to push events to subscribers (a limitation of the Connector interface). That leads me to think we should separate the mechanism that transmits the "proxy is available" events from the mechanism which stores the data associated with the proxy.

What I propose is adjusting the prototype ProxyStreamProducer/ProxyStreamConsumer to use a PubSub interface for communicating the "proxy is available" events.

We can have StreamProducer and StreamConsumer (dropping the Proxy from the name for brevity) consumer classes initialized with a Publisher and Subscriber instance, respectively. Both the StreamProducer and StreamConsumer are both still initialized with a Store[Connector] instance. The Publisher and Subscriber interfaces abstract any event message system (e.g., Kafka, Redis PubSub, Google Cloud PubSub), and are used by the StreamProducer and StreamConsumer to propagate event metadata. The actual object data associated with the events is still stored/communicated via the Store[Connector] interface.

The benefit of this separation of concerns (event streaming vs object storage) is that each part can be independently optimized. The PubSub system can be optimized for low latency while the data storage can be optimized for location/bandwidth/storage capacity etc. This also means that we maintain the benefits of bulk data transfer only occurring between the source and true destination (the process which resolves the proxy).

API

class StreamProducer(Generic[Store[ConnectorT], Publisher]):
    def __init__(self, store: Store[ConnectorT], publisher: Publisher, ...) -> None: ...
    def close(self) -> None: ...
    def send(self, obj: Any, ...) -> None: ...

class StreamConsumer(Generic[Store[ConnectorT], Subscriber]):
    def __init__(self, store: Store[ConnectorT], subscriber: Subscriber, ...) -> None: ...
    def __iter__(self) -> Self: ...
    def __next__(self) -> Proxy[Any]: ...
    def close(self) -> None: ...

class Publisher(Protocol):
     def close(self) -> None: ...
     def send(self, message: str) -> None: ...

class Subscriber(Protocol):
     def close(self) -> None: ...
     def recv(self) -> str: ...

All of the above can be implemented as context managers as well.

It should be fairly trivial to then implement a KafkaPublisher/KafkaSubscriber, RedisPublisher/RedisSubscriber, etc.

We should also consider how to handle "topics". The Publisher could have a default topic and a topic parameter on send which is set to the default. Then Subscriber is only subscribed to one topic at a time.

Example

Producer

from proxystore.connector.redis import RedisConnector
from proxystore.store import Store
from proxystore.stream import StreamProducer
from proxystore.pubsub.redis import RedisPublisher

store = Store('redis', RedisConnector(...))
stream = StreamProducer(store, RedisPublisher(...))

for obj in ...:
    stream.send(obj)

Consumer

from proxystore.connector.redis import RedisConnector
from proxystore.store import Store
from proxystore.stream import StreamConsumer
from proxystore.pubsub.redis import RedisSubscriber

store = Store('redis', RedisConnector(...))
stream = StreamConsumer(store, RedisSubscriber(...))

for obj in stream:
    assert isinstance(obj, Proxy)

Metadata

Metadata

Assignees

Labels

enhancementNew features or improvements to existing functionalityproposalFeature proposals for discussion

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions