Skip to content

Custom headers not propagated as documented ? #4875

@frivoire

Description

@frivoire

Intro

I cannot get to work properly with custom (additional) headers : they dont seem to be readable by the consumer, or at least not the way it's documented.

With this setup :

$ celery report

software -> celery:4.2.0 (windowlicker) kombu:4.2.1 py:2.7.14
            billiard:3.5.0.3 py-amqp:2.3.2
platform -> system:Darwin arch:64bit imp:CPython
loader   -> celery.loaders.default.Loader
settings -> transport:amqp results:disabled

Steps to reproduce

1. Prepare minimal venv :

virtualenv venv
source ./venv/bin/activate
pip install celery==4.2.0

2. Start a rabbitmq with default configuration, exactly it's the Docker image rabbitmq:3-management, so RabbitMQ 3.7.3 (Erlang 20.2.2)
3. Create a task with 2 additionnal headers, like this : ./venv/bin/python create_task.py

from celery import Celery

app = Celery('proj', broker='amqp://guest@127.0.0.1:5672')

mydict = {'customheader1':'foo', 'customheader2':'bar'}
app.send_task('proj.add', [23, 45], headers=mydict)

4. Check that the task created in RabbitMQ contains the 2 headers ==> yes, it seems OK
5. Launch the worker : ./venv/bin/python worker.py worker -l INFO

from celery import Celery
from celery.utils.log import get_task_logger

app = Celery('proj', broker='amqp://guest@127.0.0.1:5672')
logger = get_task_logger(__name__)

@app.task(bind=True)
def add(self, x, y):
    logger.info("self.request.headers: %s", self.request.headers)
    logger.info("self.request.customheader1: %s", self.request.customheader1)
    logger.info("self.request.customheader2: %s", self.request.customheader2)
    return x + y

if __name__ == '__main__':
    app.start()

6. Observe that headers are not where it's documented :
- http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-request says that app.Task.request should contains an attribute headers with this description Mapping of message headers sent with this task message (may be None)
- but this headers attribute seems also empty, cf below :

Actual behavior

$ ./venv/bin/python worker.py worker -l INFO

celery@mbp-fri.local v4.2.0 (windowlicker)
Darwin-17.6.0-x86_64-i386-64bit 2018-07-03 10:30:38

[config]
.> app:         proj:0x10b016810
.> transport:   amqp://guest:**@127.0.0.1:5672//
.> results:     disabled://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery

[tasks]
  . proj.add

[2018-07-03 10:30:38,684: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-07-03 10:30:38,698: INFO/MainProcess] mingle: searching for neighbors
[2018-07-03 10:30:39,731: INFO/MainProcess] mingle: all alone
[2018-07-03 10:30:39,750: INFO/MainProcess] celery@mbp-fri.local ready.
[2018-07-03 10:30:41,598: INFO/MainProcess] Received task: proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]
[2018-07-03 10:30:41,601: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.headers: None               <=====
[2018-07-03 10:30:41,602: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.customheader1: foo
[2018-07-03 10:30:41,602: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.customheader2: bar
[2018-07-03 10:30:41,603: INFO/ForkPoolWorker-2] Task proj.add[0d6163f3-5457-4267-820f-645de40ad2ed] succeeded in 0.00254076899728s: 68

Expected behavior

[same startup]
[2018-07-03 10:30:41,598: INFO/MainProcess] Received task: proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]
[2018-07-03 10:30:41,602: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.headers: {'customheader2': 'bar', 'customheader1': 'foo'}              <=====
[2018-07-03 10:30:41,602: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.customheader1: None
[2018-07-03 10:30:41,602: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.customheader2: None
[2018-07-03 10:30:41,603: INFO/ForkPoolWorker-2] Task proj.add[0d6163f3-5457-4267-820f-645de40ad2ed] succeeded in 0.00254076899728s: 68

Conclusion

So, I think there is something wrong about handling of custom headers.
Another aspect that is not described above : how to iterate on custom headers ? It seems impossible, since they are mixed with "system headers" (argsrepr, eta, expires, group, id, etc.)

Of course, if I've missed something in the doc, in my code, or somewhere else, I appreciate any advices/critics :)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions