-
-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Closed
Description
Checklist
- I have included the output of
celery -A proj reportin the issue.
(if you are not able to do this, then at least specify the Celery
version affected). - I have verified that the issue exists against the
masterbranch of Celery.
software -> celery:4.2.1 (windowlicker) kombu:4.2.1 py:2.7.6
billiard:3.5.0.4 redis:2.10.6
platform -> system:Linux arch:64bit, ELF imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:redis results:db+mysql://celery:**@127.0.0.1/celery
task_serializer: 'json'
broker_heartbeat: 360
result_serializer: 'json'
event_queue_ttl: 60
prefetch_multiplier: 1
broker_url: u'redis://:********@localhost:6379/0'
task_acks_late: True
worker_redirect_stdouts_level: 'INFO'
database_result_engine_options: {
'isolation_level': 'READ_COMMITTED'}
task_time_limit: 910
task_soft_time_limit: 900
accept_content: ['json']
task_track_started: True
result_backend: u'db+mysql://celery:********@127.0.0.1/celery'
max_tasks_per_child: 100
Steps to reproduce
Tasks file
@app.task()
def noop(*results):
returnExample workflow
from tasks import noop
workflow = (
noop.s() |
chord([noop.s() for i in range(10)], noop.s()) |
chord([noop.s() for i in range(10)], noop.s()) |
chord([noop.s() for i in range(10)], noop.s()) |
chord([noop.s() for i in range(10)], noop.s()) |
chord([noop.s() for i in range(10)], noop.s()) |
noop.s()
)
workflow.apply_async() Patch kombu to see serialized message size
diff --git a/kombu/utils/json.py b/kombu/utils/json.py
index b2466a4..dee3476 100644
--- a/kombu/utils/json.py
+++ b/kombu/utils/json.py
@@ -65,8 +65,11 @@ _default_encoder = JSONEncoder
def dumps(s, _dumps=json.dumps, cls=None,
default_kwargs=_json_extra_kwargs, **kwargs):
"""Serialize object to json string."""
- return _dumps(s, cls=cls or _default_encoder,
+ res = _dumps(s, cls=cls or _default_encoder,
**dict(default_kwargs, **kwargs))
+ msg_size = len(res)
+ print("Serialized data size {:,} bytes".format(msg_size))
+ return res
def loads(s, _loads=json.loads, decode_bytes=PY3):Expected behavior
The workflow contains roughly 57 tasks. They shouldn't be serialized into such big messages
Actual behavior
apply_async causes serialization of the workflow producing 2 messages with following sizes
Serialized data size 235,441,375 bytes
Serialized data size 313,922,589 bytes
As a result celery worker on my machine eats all memory.
scorpp