Skip to content

Handle Redis connection errors in result consumer #5919

@michamos

Description

@michamos

Checklist

  • I have checked the issues list
    for similar or identical enhancement to an existing feature.
  • I have checked the pull requests list
    for existing proposed enhancements.
  • I have checked the commit log
    to find out if the if the same enhancement was already implemented in the
    master branch.
  • I have included all related issues and possible duplicate issues in this issue
    (If there are none, check this box anyway).

Related Issues and Possible Duplicates

Related Issues

  • None

Possible Duplicates

  • None

Brief Summary

When there is a connection error with Redis while executing a command, in most cases, the redis client will discard the connection, causing the next command sent to Redis to open a new connection. This allows applications to recover from connection errors by simply retrying, a property that is used in Celery, for example when setting keys in the Redis result backend:

def set(self, key, value, **retry_policy):
return self.ensure(self._set, (key, value), **retry_policy)

This is not the case however when the connection to Redis is in a pubsub state. The reason for that is that some state associated with the connection (namely the list of keys subscibed to). The Redis client doesn't keep track of this state, so it can't possibly restore it when creating a new connection and leaves the connection handling to the application code.

The Celery Redis result consumer uses pubsub in order to be notified when results are available, but doesn't handle connection errors at all, causing a result consumer to end up in a state where it can't connect to the result backend any more after a single connection error, as any further attempt will reuse the same faulty connection.

The solution would be to add error handling logic to the result consumer, so it will recreate the connection on connection errors and initialize it to the proper state.

Design

Architectural Considerations

None

Proposed Behavior

Add error handling in all places the Redis result consumer sends a Redis command in a pubsub context:

We should catch all Redis connection errors, and call a new method that will reinitialize a pubsub connection in the proper state (discard the current connection from the pool, start the pubsub context, subscribe to all keys in ResultConsumer.subscribed_to) using the retry policy. If in drain_events, we should try to get new messages again.

This will take care of most issues with connection errors. I see two remaining issues:
1.Some message might have been lost (sent between losing the connection and reconnecting). We could read all keys subscribed to right after reconnecting and before starting the pubsub context and call on_state_change for each existing key, but this might cause some messages to be delivered twice and I don't know how Celery will react to that.
2. If the connection can't be re-established despite the retries and reaches max-retries, the result consumer will end up with a faulty connection that can't be recovered from. This should be communicated somehow to the user (documentation, logging an explicit error message, custom exception).

Proposed UI/UX

None

Diagrams

N/A

Alternatives

None

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions