Skip to content

Conversation

@gpauloski
Copy link
Collaborator

Description

The existing RedisSubscriber/Publisher use Redis Pub/Sub, which has the downside of dropping messages when no subscriber is present. The RedisQueueSubscriber/Publisher are an alternative that uses a Redis list and rpush/blpop to implement a persistent queue where messages will not be dropped.

Fixes N/A

Type of Change

  • Breaking Change (fix or enhancement which changes existing semantics of the public interface)
  • Enhancement (new features or improvements to existing functionality)
  • Bug (fixes for a bug or issue)
  • Internal (refactoring, style changes, testing, optimizations)
  • Documentation update (changes to documentation or examples)
  • Package (dependencies, versions, package metadata)
  • Development (CI workflows, pre-commit, linters, templates)
  • Security (security related changes)

Testing

Added new unit tests which test against a mock Redis client, and used the following script to test against a real Redis server running locally.

import uuid
from proxystore.stream.shims.redis import RedisQueueSubscriber
from proxystore.stream.shims.redis import RedisQueuePublisher

host = '127.0.0.1'
port = 6379
queue = str(uuid.uuid4())

publisher = RedisQueuePublisher(host, port)
subscriber = RedisQueueSubscriber(host, port, queue)

values = [b'value1', b'value2', b'value3']

for value in values:
    publisher.send(queue, value)

for expected, found in zip(values, subscriber):
    assert found == expected

print('Finished basic test')

try:
    print('Waiting with no timeout... ctrl-c to stop')
    next(subscriber)
except KeyboardInterrupt:
    print('Continuing')

subscriber = RedisQueueSubscriber(host, port, queue, timeout=1)

try:
    print('Waiting with timeout 1')
    next(subscriber)
except TimeoutError:
    print('TimeoutError raised as expected')

print('Done.'

Pull Request Checklist

Please confirm the PR meets the following requirements.

  • Tags added to PR (e.g., breaking, bug, enhancement, internal, documentation, package, development, security).
  • Code changes pass pre-commit (e.g., mypy, ruff, etc.).
  • Tests have been added to show the fix is effective or that the new feature works.
  • New and existing unit tests pass locally with the changes.
  • Docs have been updated and reviewed if relevant.

The existing RedisSubscriber/Publisher use Redis Pub/Sub, which has the
downside of dropping messages when no subscriber is present. The
RedisQueueSubscriber/Publisher are an alternative that uses a Redis list
and rpush/blpop to implement a persistent queue where messages will not
be dropped.
@gpauloski gpauloski added the enhancement New features or improvements to existing functionality label Mar 18, 2024
@gpauloski gpauloski merged commit 4cf03f6 into main Mar 18, 2024
@gpauloski gpauloski deleted the redis-queue-shim branch March 18, 2024 18:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New features or improvements to existing functionality

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants