-
-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Description
Checklist
- I have checked the issues list
for similar or identical enhancement to an existing feature. - I have checked the pull requests list
for existing proposed enhancements. - I have checked the commit log
to find out if the if the same enhancement was already implemented in the
master branch. - I have included all related issues and possible duplicate issues in this issue
(If there are none, check this box anyway).
Related Issues and Possible Duplicates
Related Issues
- None
Possible Duplicates
- None
Brief Summary
When there is a connection error with Redis while executing a command, in most cases, the redis client will discard the connection, causing the next command sent to Redis to open a new connection. This allows applications to recover from connection errors by simply retrying, a property that is used in Celery, for example when setting keys in the Redis result backend:
celery/celery/backends/redis.py
Lines 324 to 325 in d056305
| def set(self, key, value, **retry_policy): | |
| return self.ensure(self._set, (key, value), **retry_policy) |
This is not the case however when the connection to Redis is in a pubsub state. The reason for that is that some state associated with the connection (namely the list of keys subscibed to). The Redis client doesn't keep track of this state, so it can't possibly restore it when creating a new connection and leaves the connection handling to the application code.
The Celery Redis result consumer uses pubsub in order to be notified when results are available, but doesn't handle connection errors at all, causing a result consumer to end up in a state where it can't connect to the result backend any more after a single connection error, as any further attempt will reuse the same faulty connection.
The solution would be to add error handling logic to the result consumer, so it will recreate the connection on connection errors and initialize it to the proper state.
Design
Architectural Considerations
None
Proposed Behavior
Add error handling in all places the Redis result consumer sends a Redis command in a pubsub context:
celery/celery/backends/redis.py
Line 127 in d056305
message = self._pubsub.get_message(timeout=timeout) celery/celery/backends/redis.py
Line 142 in d056305
self._pubsub.subscribe(key) celery/celery/backends/redis.py
Line 148 in d056305
self._pubsub.unsubscribe(key)
We should catch all Redis connection errors, and call a new method that will reinitialize a pubsub connection in the proper state (discard the current connection from the pool, start the pubsub context, subscribe to all keys in ResultConsumer.subscribed_to) using the retry policy. If in drain_events, we should try to get new messages again.
This will take care of most issues with connection errors. I see two remaining issues:
1.Some message might have been lost (sent between losing the connection and reconnecting). We could read all keys subscribed to right after reconnecting and before starting the pubsub context and call on_state_change for each existing key, but this might cause some messages to be delivered twice and I don't know how Celery will react to that.
2. If the connection can't be re-established despite the retries and reaches max-retries, the result consumer will end up with a faulty connection that can't be recovered from. This should be communicated somehow to the user (documentation, logging an explicit error message, custom exception).
Proposed UI/UX
None
Diagrams
N/A
Alternatives
None