-
-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Closed
Milestone
Description
software -> celery:4.0.0 (latentcall) kombu:4.0.0 py:2.7.10
billiard:3.5.0.2 py-amqp:2.1.1
platform -> system:FreeBSD arch:64bit, ELF imp:PyPy
loader -> celery.loaders.app.AppLoader
settings -> transport:amqp results:cache+memcached://10.1.1.2:11211/
amqp==2.1.1
celery==4.0.0
Django==1.10.3
librabbitmq==1.6.1
pylibmc==1.5.1
Steps to reproduce
# celery.py
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
app = Celery('project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(force=True)from celery import task, group, chain
@task
def add(x, y):
return x + y
# chain( task, group(tasks) )
x = chain( add.si(1, 1), group([add.si(1, 1), add.si(1, 1)]) )
type(x) # celery.canvas._chain
x.apply_async() # works as expected
# chain( task, group(tasks), group(tasks) )
x = chain( add.si(1, 1), group([add.si(1, 1), add.si(1, 1)]), group([add.si(1, 1), add.si(1, 1)]) )
type(x) # celery.canvas._chain
x.apply_async() # fails, traceback below
# chain( task, group(tasks), task, group(tasks) )
x = chain( add.si(1, 1), group([add.si(1, 1), add.si(1, 1)]), add.si(1, 1), group([add.si(1, 1), add.si(1, 1)]) )
type(x) # celery.canvas._chain
x.apply_async() # works as expectedExpected behavior
It seems like a chain containing subsequent groups fails. If there is a task in between, or just a single group, it works as expected. This is different behavior from 3.x and I can't seem to find any obvious differences in the docs. The only thing that looks relevant in the release notes is the fix about group | group being flattened into a single group (#2573).
Actual behavior
AttributeError Traceback (most recent call last)
<ipython-input-30-495e2ee230ea> in <module>()
----> 1 x.apply_async()
/project/site-packages/celery/canvas.pyc in delay(self, *partial_args, **partial_kwargs)
180 def delay(self, *partial_args, **partial_kwargs):
181 """Shortcut to :meth:`apply_async` using star arguments."""
--> 182 return self.apply_async(partial_args, partial_kwargs)
183
184 def apply(self, args=(), kwargs={}, **options):
/project/site-packages/celery/canvas.pyc in apply_async(self, args, kwargs, **options)
565 return self.apply(args, kwargs, **options)
566 return self.run(args, kwargs, app=app, **(
--> 567 dict(self.options, **options) if options else self.options))
568
569 def run(self, args=(), kwargs={}, group_id=None, chord=None,
/project/site-packages/celery/canvas.pyc in run(self, args, kwargs, group_id, chord, task_id, link, link_error, publisher, producer, root_id, parent_id, app, **options)
584 tasks, results = self.prepare_steps(
585 args, self.tasks, root_id, parent_id, link_error, app,
--> 586 task_id, group_id, chord,
587 )
588
/project/site-packages/celery/canvas.pyc in prepare_steps(self, args, tasks, root_id, parent_id, link_error, app, last_task_id, group_id, chord_body, clone, from_dict)
663 task = chord(
664 task, body=prev_task,
--> 665 task_id=prev_res.task_id, root_id=root_id, app=app,
666 )
667
AttributeError: 'GroupResult' object has no attribute 'task_id'