Skip to content

Chaining Chords produces enormously big messages causing OOM on workers #5000

@brabiega

Description

@brabiega

Checklist

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the master branch of Celery.
software -> celery:4.2.1 (windowlicker) kombu:4.2.1 py:2.7.6
            billiard:3.5.0.4 redis:2.10.6
platform -> system:Linux arch:64bit, ELF imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:redis results:db+mysql://celery:**@127.0.0.1/celery

task_serializer: 'json'
broker_heartbeat: 360
result_serializer: 'json'
event_queue_ttl: 60
prefetch_multiplier: 1
broker_url: u'redis://:********@localhost:6379/0'
task_acks_late: True
worker_redirect_stdouts_level: 'INFO'
database_result_engine_options: {
    'isolation_level': 'READ_COMMITTED'}
task_time_limit: 910
task_soft_time_limit: 900
accept_content: ['json']
task_track_started: True
result_backend: u'db+mysql://celery:********@127.0.0.1/celery'
max_tasks_per_child: 100

Steps to reproduce

Tasks file

@app.task()
def noop(*results):
    return

Example workflow

from tasks import noop
                                                    
workflow = (                                        
    noop.s() |                                      
    chord([noop.s() for i in range(10)], noop.s()) |
    chord([noop.s() for i in range(10)], noop.s()) |
    chord([noop.s() for i in range(10)], noop.s()) |
    chord([noop.s() for i in range(10)], noop.s()) |
    chord([noop.s() for i in range(10)], noop.s()) |
    noop.s()                                        
)                                                   
                                                    
workflow.apply_async()                              

Patch kombu to see serialized message size

diff --git a/kombu/utils/json.py b/kombu/utils/json.py
index b2466a4..dee3476 100644
--- a/kombu/utils/json.py
+++ b/kombu/utils/json.py
@@ -65,8 +65,11 @@ _default_encoder = JSONEncoder
 def dumps(s, _dumps=json.dumps, cls=None,
           default_kwargs=_json_extra_kwargs, **kwargs):
     """Serialize object to json string."""
-    return _dumps(s, cls=cls or _default_encoder,
+    res = _dumps(s, cls=cls or _default_encoder,
                   **dict(default_kwargs, **kwargs))
+    msg_size = len(res)
+    print("Serialized data size {:,} bytes".format(msg_size))
+    return res
 
 
 def loads(s, _loads=json.loads, decode_bytes=PY3):

Expected behavior

The workflow contains roughly 57 tasks. They shouldn't be serialized into such big messages

Actual behavior

apply_async causes serialization of the workflow producing 2 messages with following sizes

Serialized data size 235,441,375 bytes
Serialized data size 313,922,589 bytes

As a result celery worker on my machine eats all memory.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions