Skip to content

Expose config option to set retry_policy on (Redis) ResultBackend #6166

@ashb

Description

@ashb

Checklist

Checklist

  • I have checked the issues list for similar or identical feature requests.
  • I have checked the pull requests list for existing proposed implementations of this feature.
  • I have checked the commit log to find out if the if the same feature was already implemented in the master branch.
  • I have included all related issues and possible duplicate issues in this issue (If there are none, check this box anyway).

Brief Summary

In debugging another problem I noticed that the default connection timeout for the Redis result backend is unconfigurable, and the timeout is hardcoded at 20s (20 x 1s retries)

This appears to be only actually an issue for the Redis backend (RPC is the only other backend that appears to do anything in on_task_call)

task.apply_async() calls these functions

celery/celery/app/base.py

Lines 776 to 780 in d3863d9

with self.producer_or_acquire(producer) as P:
with P.connection._reraise_as_library_errors():
if not ignored_result:
self.backend.on_task_call(P, task_id)
amqp.send_task_message(P, name, message, **options)

def on_task_call(self, producer, task_id):
if not task_join_will_block():
self.result_consumer.consume_from(task_id)

def consume_from(self, task_id):
if self._pubsub is None:
return self.start(task_id)
self._consume_from(task_id)
def _consume_from(self, task_id):
key = self._get_key_for_task(task_id)
if key not in self.subscribed_to:
self.subscribed_to.add(key)
with self.reconnect_on_error():
self._pubsub.subscribe(key)

@contextmanager
def reconnect_on_error(self):
try:
yield
except self._connection_errors:
try:
self._ensure(self._reconnect_pubsub, ())
except self._connection_errors:
logger.critical(E_RETRY_LIMIT_EXCEEDED)
raise

_ensure is set via

self._ensure = self.backend.ensure

to be

def ensure(self, fun, args, **policy):
retry_policy = dict(self.retry_policy, **policy)
max_retries = retry_policy.get('max_retries')
return retry_over_time(
fun, self.connection_errors, args, {},
partial(self.on_connection_error, max_retries),
**retry_policy)

But ensure is never passed any kwargs, nor is it set anywhere other than in the Backend supclass

retry_policy = {
'max_retries': 20,
'interval_start': 0,
'interval_step': 1,
'interval_max': 1,
}

Design

The "most obvious" way of doing this would be to add a new top-level result_backend_options config key that feeds in to the constructor.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions