-
Notifications
You must be signed in to change notification settings - Fork 906
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Describe your environment
celery = "5.3.4"
opentelemetry-sdk = "1.20"
opentelemetry-propagator-b3 = "1.20"
opentelemetry-exporter-jaeger = "1.20"
opentelemetry-instrumentation-django = "0.41b0"
opentelemetry-instrumentation-asgi = "0.41b0"
opentelemetry-instrumentation-celery = "0.41b0"
Steps to reproduce
$ cat web2/__init__.py
import logging
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mycoolproject.settings") # NOQA
from kombu import Queue
from celery import Celery, signals
from django.conf import settings
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
logger = logging.getLogger(__name__)
DISABLE_TRACING = os.getenv('DISABLE_TRACING', False) == 'true'
TRACING_COLLECTOR_ENDPOINT = os.getenv('TRACING_COLLECTOR_ENDPOINT', 'jaeger-collector.monitoring')
TRACING_COLLECTOR_PORT = os.getenv('TRACING_COLLECTOR_PORT', '14268')
@signals.worker_process_init.connect(weak=False)
def worker_process_init(*args, **kwargs):
init_celery_tracing()
def init_celery_tracing():
CeleryInstrumentor().instrument()
trace.set_tracer_provider(TracerProvider())
if DISABLE_TRACING:
span_processor = BatchSpanProcessor(ConsoleSpanExporter())
else:
jaeger_exporter = JaegerExporter(
collector_endpoint=f'http://{TRACING_COLLECTOR_ENDPOINT}:{TRACING_COLLECTOR_PORT}/api/traces?format=jaeger.thrift',
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# this stops celery from hijacking the django logging config
@signals.setup_logging.connect
def setup_celery_logging(**kwargs):
pass
tasks_app = Celery('web', broker=settings.CELERY_BROKER_URL)
tasks_app.conf.task_queues = (
Queue('default2', routing_key='task.#'),
)
tasks_app.conf.task_default_queue = 'default2'
tasks_app.conf.task_default_exchange_type = 'topic'
tasks_app.conf.task_default_routing_key = 'task.default2'
tasks_app.conf.result_backend = 'redis://redis:6379/0'
tasks_app.conf.update(
task_routes=({
'web2.tasks.itsatest': {
'queue': 'default2',
'routing_key': 'task.default2.itsatest',
},
},),
imports=(
'web2.tasks',
),
worker_prefetch_multiplier=5,
task_soft_time_limit=300,
task_time_limit=360,
worker_max_tasks_per_child=100,
broker_transport_options={
'confirm_publish': True
}
)$ cat web2/tasks.py
from web2 import tasks_app
@tasks_app.task(ignore_result=True)
def itsatest():
print('itsatest')
return 'itsatest'poetry run celery --app web2.tasks workerfrom web2.tasks import *
itsatest.delay()
What is the expected behavior?
No errors when executing tasks
What is the actual behavior?
[2023-11-01 20:14:58 +0000] [416] [celery.worker.strategy] [INFO] Task web2.tasks.itsatest[50b55ccd-be08-4f5c-82ee-538b363498ec] received
[2023-11-01 20:14:58 +0000] [416] [celery.pool] [DEBUG] TaskPool: Apply <function fast_trace_task at 0xffff8c758a60> (args:('web2.tasks.itsatest', '50b55ccd-be08-4f5c-82ee-538b363498ec', {'lang': 'py', 'task': 'web2.tasks.itsatest', 'id': '50b55ccd-be08-4f5c-82ee-538b363498ec', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '50b55ccd-be08-4f5c-82ee-538b363498ec', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen1251@4676bacd79b5', 'ignore_result': True, 'stamped_headers': None, 'stamps': {}, 'properties': {'content_type': 'application/json', 'content_encoding': 'utf-8', 'application_headers': {'lang': 'py', 'task': 'web2.tasks.itsatest', 'id': '50b55ccd-be08-4f5c-82ee-538b363498ec', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '50b55ccd-be08-4f5c-82ee-538b363498ec', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen1251@4676bacd79b5', 'ignore_result': True, 'stamped_headers': None, 'stamps': {}}, 'delivery_mode':... kwargs:{})
[2023-11-01 20:14:58 +0000] [518] [opentelemetry.instrumentation.celery] [DEBUG] prerun signal start task_id=50b55ccd-be08-4f5c-82ee-538b363498ec
itsatest
[2023-11-01 20:14:58 +0000] [518] [celery.app.trace] [INFO] Task web2.tasks.itsatest[50b55ccd-be08-4f5c-82ee-538b363498ec] succeeded in 0.013544000015826896s: 'itsatest'
[2023-11-01 20:14:58 +0000] [518] [opentelemetry.instrumentation.celery] [DEBUG] postrun signal task_id=50b55ccd-be08-4f5c-82ee-538b363498ec
[2023-11-01 20:14:58 +0000] [518] [celery.utils.dispatch.signal] [ERROR] Signal handler <bound method CeleryInstrumentor._trace_postrun of <opentelemetry.instrumentation.celery.CeleryInstrumentor object at 0xffff44692110>> raised: TypeError("'NoneType' object is not subscriptable")
Traceback (most recent call last):
File "/opt/.venv/web-tq7C0_9c-py3.10/lib/python3.10/site-packages/celery/utils/dispatch/signal.py", line 276, in send
response = receiver(signal=self, sender=sender, **named)
File "/opt/.venv/web-tq7C0_9c-py3.10/lib/python3.10/site-packages/opentelemetry/instrumentation/celery/__init__.py", line 195, in _trace_postrun
self._record_histograms(task_id, labels)
File "/opt/.venv/web-tq7C0_9c-py3.10/lib/python3.10/site-packages/opentelemetry/instrumentation/celery/__init__.py", line 309, in _record_histograms
self.metrics["flower.task.runtime.seconds"].record(
TypeError: 'NoneType' object is not subscriptable
Additional context
It looks like this functionality was introduced fairly recently in #1679 by @shalevr and @Akochavi
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working