Skip to content

Celery with concurrency does not use all workers while multiple tasks reserved #7277

@mskogorevrmc

Description

@mskogorevrmc

Checklist

  • I have verified that the issue exists against the master branch of Celery.
  • This has already been asked to the discussions forum first.
  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the master branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the master branch of Celery.
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue on more than one Python version
    and/or implementation.
  • I have tried reproducing the issue on more than one message broker and/or
    result backend.
  • I have tried reproducing the issue on more than one version of the message
    broker and/or result backend.
  • I have tried reproducing the issue on more than one operating system.
  • I have tried reproducing the issue on more than one workers pool.
  • I have tried reproducing the issue with autoscaling, retries,
    ETA/Countdown & rate limits disabled.
  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies.

Related Issues and Possible Duplicates

Related Issues

Possible Duplicates

  • None

Environment & Settings

Celery version: 5.2.3

celery report Output:

software -> celery:5.2.3 (dawn-chorus) kombu:5.2.3 py:3.7.3
            billiard:3.6.4.0 py-amqp:5.0.9
platform -> system:Linux arch:64bit, ELF
            kernel version:4.19.0-18-amd64 imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:amqp results:rpc:///

deprecated_settings: None
broker_url: 'amqp://svcmgr:********@brocker:5672//'
result_backend: 'rpc:///'
broker_heartbeat: 10
broker_connection_timeout: 20
broker_connection_retry: True
broker_connection_max_retries: 100
result_backend_always_retry: True
task_reject_on_worker_lost: True
result_expires: 3600

Steps to Reproduce

Required Dependencies

  • Minimal Python Version: 3.7.3
  • Minimal Celery Version: 5.2.3
  • Minimal Kombu Version: 5.2.3
  • Minimal Broker Version: RabbitMQ version: 3.7.8
  • Minimal Result Backend Version: N/A or Unknown
  • Minimal OS and/or Kernel Version: N/A or Unknown
  • Minimal Broker Client Version: N/A or Unknown
  • Minimal Result Backend Client Version: N/A or Unknown

Python Packages

pip freeze Output:

Package                  Version
------------------------ ----------
amqp                     5.0.9
attrs                    21.4.0
awscli                   1.20.25
axcient-cloud-clients    0.0.1
baseconvert              1.0.0a4
bcrypt                   3.2.0
billiard                 3.6.4.0
boto3                    1.18.25
botocore                 1.21.25
cached-property          1.5.2
celery                   5.2.3
certifi                  2021.10.8
cffi                     1.14.6
chardet                  3.0.4
click                    8.0.3
click-didyoumean         0.3.0
click-plugins            1.1.1
click-repl               0.2.0
clickclick               20.10.2
collectd                 1.0
colorama                 0.4.3
connexion                2.7.0
cryptography             3.4.7
cymysql                  0.9.18
docutils                 0.15.2
elasticsearch            7.13.0
Flask                    2.0.1
Flask-SQLAlchemy         2.4.4
idna                     2.10
importlib-metadata       4.10.1
inflection               0.5.1
itsdangerous             2.0.1
Jinja2                   3.0.3
jmespath                 0.10.0
jsonschema               3.2.0
kombu                    5.2.3
ldap3                    2.9.1
libnacl                  1.8.0
lockfile                 0.12.2
MarkupSafe               2.0.1
netaddr                  0.8.0
openapi-schema-validator 0.2.3
openapi-spec-validator   0.3.1
paramiko                 2.7.2
pika                     1.2.0
Pillow                   9.0.0
pip                      22.0.2
pkg_resources            0.0.0
ply                      3.11
prompt-toolkit           3.0.26
py-zipkin                0.20.0
pyasn1                   0.4.8
pycparser                2.21
pycryptodome             3.9.9
PyJWT                    1.7.1
pymacaroons              0.13.0
pymongo                  3.12.0
PyMySQL                  1.0.2
PyNaCl                   1.5.0
pyrsistent               0.18.1
python-daemon            2.3.0
python-dateutil          2.8.2
pytz                     2021.3
PyYAML                   5.4.1
requests                 2.25.0
requests-aws4auth        1.1.1
rsa                      4.7.2
s3transfer               0.5.0
servicemanager           4.17.0.310
setuptools               59.1.1
six                      1.15.0
SQLAlchemy               1.3.24
sqlalchemy-collectd      0.0.6
tenacity                 8.0.1
thriftpy2                0.4.14
typing_extensions        4.0.1
tzlocal                  2.1
urllib3                  1.26.8
vine                     5.0.0
wcwidth                  0.2.5
Werkzeug                 2.0.1
wheel                    0.37.1
zipp                     3.7.0

Other Dependencies

Details

N/A

Minimally Reproducible Test Case

Details

Expected Behavior

Workers is running all available tasks

Actual Behavior

Working Celery Settings:

celery -A app.celery inspect stats:

    {
        "broker": {
            "alternates": [],
            "connect_timeout": 20,
            "failover_strategy": "round-robin",
            "heartbeat": 10.0,
            "hostname": "brocker",
            "insist": false,
            "login_method": "PLAIN",
            "port": 5672,
            "ssl": false,
            "transport": "amqp",
            "transport_options": {},
            "uri_prefix": null,
            "userid": "user",
            "virtual_host": "/"
        },
        "clock": "128604715",
        "pid": 9580,
        "pool": {
            "max-concurrency": 72,
            "max-tasks-per-child": "N/A",
            "processes": [
                9607,
                9610,
                9611,
                9612,
                9613,
                9614,
                9615,
                9616,
                9617,
                9618,
                9626,
                9627,
                9628,
                9629,
                9630,
                9631,
                9632,
                9633,
                9634,
                9635,
                9636,
                9637,
                9639,
                9640,
                9641,
                9643,
                9644,
                9645,
                9646,
                9647,
                9648,
                9649,
                9650,
                9651,
                9652,
                9653,
                9654,
                9655,
                9656,
                9658,
                9659,
                9660,
                9661,
                9662,
                9663,
                9664,
                9665,
                9666,
                9667,
                9668,
                9669,
                9670,
                9671,
                9672,
                9673,
                9674,
                9675,
                9676,
                9677,
                9678,
                9679,
                9680,
                9681,
                9682,
                9683,
                9684,
                9685,
                9686,
                9687,
                9688,
                9689,
                9690
            ],
            "put-guarded-by-semaphore": false,
            "timeouts": [
                0,
                0
            ],
            "writes": {
                "all": "0.22, 0.19, 0.15, 0.07, 0.07, 0.11, 0.07, 0.07, 0.04",
                "avg": "0.11",
                "inqueues": {
                    "active": 0,
                    "total": 72
                },
                "raw": "6, 5, 4, 2, 2, 3, 2, 2, 1",
                "strategy": "fair",
                "total": 27
            }
        },
        "prefetch_count": 72,
        "rusage": {
            "idrss": 0,
            "inblock": 0,
            "isrss": 0,
            "ixrss": 0,
            "majflt": 0,
            "maxrss": 118236,
            "minflt": 901798,
            "msgrcv": 0,
            "msgsnd": 0,
            "nivcsw": 47351,
            "nsignals": 0,
            "nswap": 0,
            "nvcsw": 228044,
            "oublock": 4464,
            "stime": 38.369105,
            "utime": 305.13124
        },
        "total": {
            "app.celery.start_task": 27
        },
        "uptime": 6107
    }

The docs mention here how to reserve one task at a time - or only as many as you have concurrency:

Often users ask if disabling “prefetching of tasks” is possible, but what they really mean by that, is to have a worker only reserve as many tasks as there are worker processes (10 unacknowledged tasks for -c 10)

That’s possible, but not without also enabling late acknowledgment. Using this option over the default behavior means a task that’s already started executing will be retried in the event of a power failure or the worker instance being killed abruptly, so this also means the task must be idempotent ... You can enable this behavior by using the following configuration options:

task_acks_late = True
worker_prefetch_multiplier = 1

We use these parameters for run Celery (to run workers use "--prefetch-multiplier=1" and acks_late=True in code @celery.task(name=TASK_NAME, queue='auto_task', acks_late=True).

Celery may receive 40 tasks, but it will only work with 10-20 at the same time, and some tasks remain in reserved for a long time.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions