Skip to content

Celery Instrumentation TypeError #2029

@dacox

Description

@dacox

Describe your environment

celery = "5.3.4"
opentelemetry-sdk = "1.20"
opentelemetry-propagator-b3 = "1.20"
opentelemetry-exporter-jaeger = "1.20"
opentelemetry-instrumentation-django = "0.41b0"
opentelemetry-instrumentation-asgi = "0.41b0"
opentelemetry-instrumentation-celery = "0.41b0"

Steps to reproduce

$ cat web2/__init__.py

import logging
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mycoolproject.settings")  # NOQA

from kombu import Queue
from celery import Celery, signals
from django.conf import settings
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider


logger = logging.getLogger(__name__)


DISABLE_TRACING = os.getenv('DISABLE_TRACING', False) == 'true'
TRACING_COLLECTOR_ENDPOINT = os.getenv('TRACING_COLLECTOR_ENDPOINT', 'jaeger-collector.monitoring')
TRACING_COLLECTOR_PORT = os.getenv('TRACING_COLLECTOR_PORT', '14268')


@signals.worker_process_init.connect(weak=False)
def worker_process_init(*args, **kwargs):
    init_celery_tracing()


def init_celery_tracing():
    CeleryInstrumentor().instrument()

    trace.set_tracer_provider(TracerProvider())

    if DISABLE_TRACING:
        span_processor = BatchSpanProcessor(ConsoleSpanExporter())
    else:
        jaeger_exporter = JaegerExporter(
            collector_endpoint=f'http://{TRACING_COLLECTOR_ENDPOINT}:{TRACING_COLLECTOR_PORT}/api/traces?format=jaeger.thrift',
        )
        span_processor = BatchSpanProcessor(jaeger_exporter)

    trace.get_tracer_provider().add_span_processor(span_processor)


# this stops celery from hijacking the django logging config
@signals.setup_logging.connect
def setup_celery_logging(**kwargs):
    pass


tasks_app = Celery('web', broker=settings.CELERY_BROKER_URL)
tasks_app.conf.task_queues = (
    Queue('default2', routing_key='task.#'),
)
tasks_app.conf.task_default_queue = 'default2'
tasks_app.conf.task_default_exchange_type = 'topic'
tasks_app.conf.task_default_routing_key = 'task.default2'
tasks_app.conf.result_backend = 'redis://redis:6379/0'


tasks_app.conf.update(
    task_routes=({
        'web2.tasks.itsatest': {
            'queue': 'default2',
            'routing_key': 'task.default2.itsatest',
        },

    },),
    imports=(
        'web2.tasks',
    ),
    worker_prefetch_multiplier=5,
    task_soft_time_limit=300,
    task_time_limit=360,
    worker_max_tasks_per_child=100,
    broker_transport_options={
        'confirm_publish': True
    }
)
$ cat web2/tasks.py

from web2 import tasks_app


@tasks_app.task(ignore_result=True)
def itsatest():
    print('itsatest')
    return 'itsatest'
poetry run celery --app web2.tasks worker
from web2.tasks import *

itsatest.delay()

What is the expected behavior?

No errors when executing tasks

What is the actual behavior?

[2023-11-01 20:14:58 +0000] [416] [celery.worker.strategy] [INFO] Task web2.tasks.itsatest[50b55ccd-be08-4f5c-82ee-538b363498ec] received
[2023-11-01 20:14:58 +0000] [416] [celery.pool] [DEBUG] TaskPool: Apply <function fast_trace_task at 0xffff8c758a60> (args:('web2.tasks.itsatest', '50b55ccd-be08-4f5c-82ee-538b363498ec', {'lang': 'py', 'task': 'web2.tasks.itsatest', 'id': '50b55ccd-be08-4f5c-82ee-538b363498ec', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '50b55ccd-be08-4f5c-82ee-538b363498ec', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen1251@4676bacd79b5', 'ignore_result': True, 'stamped_headers': None, 'stamps': {}, 'properties': {'content_type': 'application/json', 'content_encoding': 'utf-8', 'application_headers': {'lang': 'py', 'task': 'web2.tasks.itsatest', 'id': '50b55ccd-be08-4f5c-82ee-538b363498ec', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '50b55ccd-be08-4f5c-82ee-538b363498ec', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen1251@4676bacd79b5', 'ignore_result': True, 'stamped_headers': None, 'stamps': {}}, 'delivery_mode':... kwargs:{})
[2023-11-01 20:14:58 +0000] [518] [opentelemetry.instrumentation.celery] [DEBUG] prerun signal start task_id=50b55ccd-be08-4f5c-82ee-538b363498ec
itsatest
[2023-11-01 20:14:58 +0000] [518] [celery.app.trace] [INFO] Task web2.tasks.itsatest[50b55ccd-be08-4f5c-82ee-538b363498ec] succeeded in 0.013544000015826896s: 'itsatest'
[2023-11-01 20:14:58 +0000] [518] [opentelemetry.instrumentation.celery] [DEBUG] postrun signal task_id=50b55ccd-be08-4f5c-82ee-538b363498ec
[2023-11-01 20:14:58 +0000] [518] [celery.utils.dispatch.signal] [ERROR] Signal handler <bound method CeleryInstrumentor._trace_postrun of <opentelemetry.instrumentation.celery.CeleryInstrumentor object at 0xffff44692110>> raised: TypeError("'NoneType' object is not subscriptable")
Traceback (most recent call last):
  File "/opt/.venv/web-tq7C0_9c-py3.10/lib/python3.10/site-packages/celery/utils/dispatch/signal.py", line 276, in send
    response = receiver(signal=self, sender=sender, **named)
  File "/opt/.venv/web-tq7C0_9c-py3.10/lib/python3.10/site-packages/opentelemetry/instrumentation/celery/__init__.py", line 195, in _trace_postrun
    self._record_histograms(task_id, labels)
  File "/opt/.venv/web-tq7C0_9c-py3.10/lib/python3.10/site-packages/opentelemetry/instrumentation/celery/__init__.py", line 309, in _record_histograms
    self.metrics["flower.task.runtime.seconds"].record(
TypeError: 'NoneType' object is not subscriptable

Additional context

It looks like this functionality was introduced fairly recently in #1679 by @shalevr and @Akochavi

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions