-
-
Notifications
You must be signed in to change notification settings - Fork 5k
Custom headers not propagated as documented ? #4875
Description
Intro
I cannot get to work properly with custom (additional) headers : they dont seem to be readable by the consumer, or at least not the way it's documented.
With this setup :
$ celery report
software -> celery:4.2.0 (windowlicker) kombu:4.2.1 py:2.7.14
billiard:3.5.0.3 py-amqp:2.3.2
platform -> system:Darwin arch:64bit imp:CPython
loader -> celery.loaders.default.Loader
settings -> transport:amqp results:disabled
Steps to reproduce
1. Prepare minimal venv :
virtualenv venv
source ./venv/bin/activate
pip install celery==4.2.0
2. Start a rabbitmq with default configuration, exactly it's the Docker image rabbitmq:3-management, so RabbitMQ 3.7.3 (Erlang 20.2.2)
3. Create a task with 2 additionnal headers, like this : ./venv/bin/python create_task.py
from celery import Celery
app = Celery('proj', broker='amqp://guest@127.0.0.1:5672')
mydict = {'customheader1':'foo', 'customheader2':'bar'}
app.send_task('proj.add', [23, 45], headers=mydict)
4. Check that the task created in RabbitMQ contains the 2 headers ==> yes, it seems OK
5. Launch the worker : ./venv/bin/python worker.py worker -l INFO
from celery import Celery
from celery.utils.log import get_task_logger
app = Celery('proj', broker='amqp://guest@127.0.0.1:5672')
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
logger.info("self.request.headers: %s", self.request.headers)
logger.info("self.request.customheader1: %s", self.request.customheader1)
logger.info("self.request.customheader2: %s", self.request.customheader2)
return x + y
if __name__ == '__main__':
app.start()
6. Observe that headers are not where it's documented :
- http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-request says that app.Task.request should contains an attribute headers with this description Mapping of message headers sent with this task message (may be None)
- but this headers attribute seems also empty, cf below :
Actual behavior
$ ./venv/bin/python worker.py worker -l INFO
celery@mbp-fri.local v4.2.0 (windowlicker)
Darwin-17.6.0-x86_64-i386-64bit 2018-07-03 10:30:38
[config]
.> app: proj:0x10b016810
.> transport: amqp://guest:**@127.0.0.1:5672//
.> results: disabled://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. proj.add
[2018-07-03 10:30:38,684: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-07-03 10:30:38,698: INFO/MainProcess] mingle: searching for neighbors
[2018-07-03 10:30:39,731: INFO/MainProcess] mingle: all alone
[2018-07-03 10:30:39,750: INFO/MainProcess] celery@mbp-fri.local ready.
[2018-07-03 10:30:41,598: INFO/MainProcess] Received task: proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]
[2018-07-03 10:30:41,601: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.headers: None <=====
[2018-07-03 10:30:41,602: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.customheader1: foo
[2018-07-03 10:30:41,602: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.customheader2: bar
[2018-07-03 10:30:41,603: INFO/ForkPoolWorker-2] Task proj.add[0d6163f3-5457-4267-820f-645de40ad2ed] succeeded in 0.00254076899728s: 68
Expected behavior
[same startup]
[2018-07-03 10:30:41,598: INFO/MainProcess] Received task: proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]
[2018-07-03 10:30:41,602: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.headers: {'customheader2': 'bar', 'customheader1': 'foo'} <=====
[2018-07-03 10:30:41,602: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.customheader1: None
[2018-07-03 10:30:41,602: INFO/ForkPoolWorker-2] proj.add[0d6163f3-5457-4267-820f-645de40ad2ed]: self.request.customheader2: None
[2018-07-03 10:30:41,603: INFO/ForkPoolWorker-2] Task proj.add[0d6163f3-5457-4267-820f-645de40ad2ed] succeeded in 0.00254076899728s: 68
Conclusion
So, I think there is something wrong about handling of custom headers.
Another aspect that is not described above : how to iterate on custom headers ? It seems impossible, since they are mixed with "system headers" (argsrepr, eta, expires, group, id, etc.)
Of course, if I've missed something in the doc, in my code, or somewhere else, I appreciate any advices/critics :)