-
-
Notifications
You must be signed in to change notification settings - Fork 5k
Description
Checklist
Checklist
- I have checked the issues list for similar or identical feature requests.
- I have checked the pull requests list for existing proposed implementations of this feature.
- I have checked the commit log to find out if the if the same feature was already implemented in the master branch.
- I have included all related issues and possible duplicate issues in this issue (If there are none, check this box anyway).
Brief Summary
In debugging another problem I noticed that the default connection timeout for the Redis result backend is unconfigurable, and the timeout is hardcoded at 20s (20 x 1s retries)
This appears to be only actually an issue for the Redis backend (RPC is the only other backend that appears to do anything in on_task_call)
task.apply_async() calls these functions
Lines 776 to 780 in d3863d9
| with self.producer_or_acquire(producer) as P: | |
| with P.connection._reraise_as_library_errors(): | |
| if not ignored_result: | |
| self.backend.on_task_call(P, task_id) | |
| amqp.send_task_message(P, name, message, **options) |
celery/celery/backends/redis.py
Lines 342 to 344 in bf6139b
| def on_task_call(self, producer, task_id): | |
| if not task_join_will_block(): | |
| self.result_consumer.consume_from(task_id) |
celery/celery/backends/redis.py
Lines 167 to 177 in bf6139b
| def consume_from(self, task_id): | |
| if self._pubsub is None: | |
| return self.start(task_id) | |
| self._consume_from(task_id) | |
| def _consume_from(self, task_id): | |
| key = self._get_key_for_task(task_id) | |
| if key not in self.subscribed_to: | |
| self.subscribed_to.add(key) | |
| with self.reconnect_on_error(): | |
| self._pubsub.subscribe(key) |
celery/celery/backends/redis.py
Lines 124 to 133 in bf6139b
| @contextmanager | |
| def reconnect_on_error(self): | |
| try: | |
| yield | |
| except self._connection_errors: | |
| try: | |
| self._ensure(self._reconnect_pubsub, ()) | |
| except self._connection_errors: | |
| logger.critical(E_RETRY_LIMIT_EXCEEDED) | |
| raise |
_ensure is set via
celery/celery/backends/redis.py
Line 97 in bf6139b
| self._ensure = self.backend.ensure |
to be
celery/celery/backends/redis.py
Lines 352 to 358 in bf6139b
| def ensure(self, fun, args, **policy): | |
| retry_policy = dict(self.retry_policy, **policy) | |
| max_retries = retry_policy.get('max_retries') | |
| return retry_over_time( | |
| fun, self.connection_errors, args, {}, | |
| partial(self.on_connection_error, max_retries), | |
| **retry_policy) |
But ensure is never passed any kwargs, nor is it set anywhere other than in the Backend supclass
celery/celery/backends/base.py
Lines 106 to 111 in bf6139b
| retry_policy = { | |
| 'max_retries': 20, | |
| 'interval_start': 0, | |
| 'interval_step': 1, | |
| 'interval_max': 1, | |
| } |
Design
The "most obvious" way of doing this would be to add a new top-level result_backend_options config key that feeds in to the constructor.