-
-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Closed
Milestone
Description
python==3.6.4
celery==4.1.0
Steps to reproduce
from project.celery import app
class test_task(celery.Task):
autoretry_for = (Exception,)
retry_kwargs = {
'max_retries': 5,
'countdown': 10,
}
def run(self, *args, **kwargs):
raise Exception('test')
test_task = app.register_task(test_task())
test_task.delay()
Expected behavior
5 retries of this task with countdown of 10 seconds each.
Actual behavior
No retries at all.
So the main question - is there any way for specifying autoretry_for and retry_kwargs for class-based tasks? Just to mention - function-based tasks works properly with shared_task decorator and this arguments for it.
pawelad, rugleb, arshbot, icamys, rodrigoscc and 2 more