Skip to content

celery_worker pytest fixture timeouts since celery 5.0.3 #6521

@anlambert

Description

@anlambert

Since the 5.0.3 release of celery, the celery_worker pytest fixture leads to a timeout when performing ping check.

The issue can be reproduced using this simple test file:

pytest_plugins = ["celery.contrib.pytest"]

def test_create_task(celery_app, celery_worker):
    @celery_app.task
    def mul(x, y):
        return x * y

    assert mul.delay(4, 4).get(timeout=10) == 16

Below is the pytest output:

$ pytest -sv test_celery_worker.py 
============================================================================================== test session starts ===============================================================================================
platform linux -- Python 3.7.3, pytest-6.1.2, py-1.9.0, pluggy-0.13.1 -- /home/anlambert/.virtualenvs/swh/bin/python3
cachedir: .pytest_cache
hypothesis profile 'default' -> database=DirectoryBasedExampleDatabase('/home/anlambert/tmp/.hypothesis/examples')
rootdir: /home/anlambert/tmp
plugins: postgresql-2.5.2, asyncio-0.14.0, mock-3.3.1, cov-2.10.1, django-4.1.0, requests-mock-1.8.0, hypothesis-5.41.3, forked-1.3.0, swh.core-0.9.2.dev4+g6f9779f, flask-1.1.0, xdist-2.1.0, dash-1.17.0, swh.journal-0.5.2.dev1+g12b31a2
collected 1 item                                                                                                                                                                                                 

test_celery_worker.py::test_create_task ERROR

===================================================================================================== ERRORS =====================================================================================================
_______________________________________________________________________________________ ERROR at setup of test_create_task _______________________________________________________________________________________

request = <SubRequest 'celery_worker' for <Function test_create_task>>, celery_app = <Celery celery.tests at 0x7f99b4b91d30>, celery_includes = (), celery_worker_pool = 'solo', celery_worker_parameters = {}

    @pytest.fixture()
    def celery_worker(request,
                      celery_app,
                      celery_includes,
                      celery_worker_pool,
                      celery_worker_parameters):
        # type: (Any, Celery, Sequence[str], str, Any) -> WorkController
        """Fixture: Start worker in a thread, stop it when the test returns."""
        if not NO_WORKER:
            for module in celery_includes:
                celery_app.loader.import_task_module(module)
            with worker.start_worker(celery_app,
                                     pool=celery_worker_pool,
>                                    **celery_worker_parameters) as w:

../dev/celery/celery/contrib/pytest.py:196: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/lib/python3.7/contextlib.py:112: in __enter__
    return next(self.gen)
../dev/celery/celery/contrib/testing/worker.py:82: in start_worker
    assert ping.delay().get(timeout=ping_task_timeout) == 'pong'
../dev/celery/celery/result.py:230: in get
    on_message=on_message,
../dev/celery/celery/backends/base.py:655: in wait_for_pending
    no_ack=no_ack,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <celery.backends.cache.CacheBackend object at 0x7f99b411fb00>, task_id = '98b047a2-2027-453c-a317-eb31f44a2547', timeout = 10.0, interval = 0.5, no_ack = True, on_interval = <promise@0x7f99b4a2adf0>

    def wait_for(self, task_id,
                 timeout=None, interval=0.5, no_ack=True, on_interval=None):
        """Wait for task and return its result.
    
        If the task raises an exception, this exception
        will be re-raised by :func:`wait_for`.
    
        Raises:
            celery.exceptions.TimeoutError:
                If `timeout` is not :const:`None`, and the operation
                takes longer than `timeout` seconds.
        """
        self._ensure_not_eager()
    
        time_elapsed = 0.0
    
        while 1:
            meta = self.get_task_meta(task_id)
            if meta['status'] in states.READY_STATES:
                return meta
            if on_interval:
                on_interval()
            # avoid hammering the CPU checking status.
            time.sleep(interval)
            time_elapsed += interval
            if timeout and time_elapsed >= timeout:
>               raise TimeoutError('The operation timed out.')
E               celery.exceptions.TimeoutError: The operation timed out.

../dev/celery/celery/backends/base.py:687: TimeoutError
============================================================================================ short test summary info =============================================================================================
ERROR test_celery_worker.py::test_create_task - celery.exceptions.TimeoutError: The operation timed out.
=============================================================================================== 1 error in 10.41s ================================================================================================

After a quick git bisect session, I managed to identify the commit that introduced the issue: e203168

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions