-
-
Notifications
You must be signed in to change notification settings - Fork 5k
Description
Checklist
- I have verified that the issue exists against the
masterbranch of Celery. - This has already been asked to the discussions forum first.
- I have read the relevant section in the
contribution guide
on reporting bugs. - I have checked the issues list
for similar or identical bug reports. - I have checked the pull requests list
for existing proposed fixes. - I have checked the commit log
to find out if the bug was already fixed in the master branch. - I have included all related issues and possible duplicate issues
in this issue (If there are none, check this box anyway).
Mandatory Debugging Information
- I have included the output of
celery -A proj reportin the issue.
(if you are not able to do this, then at least specify the Celery
version affected). - I have verified that the issue exists against the
masterbranch of Celery. - I have included the contents of
pip freezein the issue. - I have included all the versions of all the external dependencies required
to reproduce this bug.
Optional Debugging Information
- I have tried reproducing the issue on more than one Python version
and/or implementation. - I have tried reproducing the issue on more than one message broker and/or
result backend. - I have tried reproducing the issue on more than one version of the message
broker and/or result backend. - I have tried reproducing the issue on more than one operating system.
- I have tried reproducing the issue on more than one workers pool.
- I have tried reproducing the issue with autoscaling, retries,
ETA/Countdown & rate limits disabled. - I have tried reproducing the issue after downgrading
and/or upgrading Celery and its dependencies.
Related Issues and Possible Duplicates
Related Issues
Possible Duplicates
- None
Environment & Settings
Celery version: 5.2.3
celery report Output:
software -> celery:5.2.3 (dawn-chorus) kombu:5.2.3 py:3.7.3
billiard:3.6.4.0 py-amqp:5.0.9
platform -> system:Linux arch:64bit, ELF
kernel version:4.19.0-18-amd64 imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:amqp results:rpc:///
deprecated_settings: None
broker_url: 'amqp://svcmgr:********@brocker:5672//'
result_backend: 'rpc:///'
broker_heartbeat: 10
broker_connection_timeout: 20
broker_connection_retry: True
broker_connection_max_retries: 100
result_backend_always_retry: True
task_reject_on_worker_lost: True
result_expires: 3600
Steps to Reproduce
Required Dependencies
- Minimal Python Version: 3.7.3
- Minimal Celery Version: 5.2.3
- Minimal Kombu Version: 5.2.3
- Minimal Broker Version: RabbitMQ version: 3.7.8
- Minimal Result Backend Version: N/A or Unknown
- Minimal OS and/or Kernel Version: N/A or Unknown
- Minimal Broker Client Version: N/A or Unknown
- Minimal Result Backend Client Version: N/A or Unknown
Python Packages
pip freeze Output:
Package Version
------------------------ ----------
amqp 5.0.9
attrs 21.4.0
awscli 1.20.25
axcient-cloud-clients 0.0.1
baseconvert 1.0.0a4
bcrypt 3.2.0
billiard 3.6.4.0
boto3 1.18.25
botocore 1.21.25
cached-property 1.5.2
celery 5.2.3
certifi 2021.10.8
cffi 1.14.6
chardet 3.0.4
click 8.0.3
click-didyoumean 0.3.0
click-plugins 1.1.1
click-repl 0.2.0
clickclick 20.10.2
collectd 1.0
colorama 0.4.3
connexion 2.7.0
cryptography 3.4.7
cymysql 0.9.18
docutils 0.15.2
elasticsearch 7.13.0
Flask 2.0.1
Flask-SQLAlchemy 2.4.4
idna 2.10
importlib-metadata 4.10.1
inflection 0.5.1
itsdangerous 2.0.1
Jinja2 3.0.3
jmespath 0.10.0
jsonschema 3.2.0
kombu 5.2.3
ldap3 2.9.1
libnacl 1.8.0
lockfile 0.12.2
MarkupSafe 2.0.1
netaddr 0.8.0
openapi-schema-validator 0.2.3
openapi-spec-validator 0.3.1
paramiko 2.7.2
pika 1.2.0
Pillow 9.0.0
pip 22.0.2
pkg_resources 0.0.0
ply 3.11
prompt-toolkit 3.0.26
py-zipkin 0.20.0
pyasn1 0.4.8
pycparser 2.21
pycryptodome 3.9.9
PyJWT 1.7.1
pymacaroons 0.13.0
pymongo 3.12.0
PyMySQL 1.0.2
PyNaCl 1.5.0
pyrsistent 0.18.1
python-daemon 2.3.0
python-dateutil 2.8.2
pytz 2021.3
PyYAML 5.4.1
requests 2.25.0
requests-aws4auth 1.1.1
rsa 4.7.2
s3transfer 0.5.0
servicemanager 4.17.0.310
setuptools 59.1.1
six 1.15.0
SQLAlchemy 1.3.24
sqlalchemy-collectd 0.0.6
tenacity 8.0.1
thriftpy2 0.4.14
typing_extensions 4.0.1
tzlocal 2.1
urllib3 1.26.8
vine 5.0.0
wcwidth 0.2.5
Werkzeug 2.0.1
wheel 0.37.1
zipp 3.7.0
Other Dependencies
Details
N/A
Minimally Reproducible Test Case
Details
Expected Behavior
Workers is running all available tasks
Actual Behavior
Working Celery Settings:
celery -A app.celery inspect stats:
{
"broker": {
"alternates": [],
"connect_timeout": 20,
"failover_strategy": "round-robin",
"heartbeat": 10.0,
"hostname": "brocker",
"insist": false,
"login_method": "PLAIN",
"port": 5672,
"ssl": false,
"transport": "amqp",
"transport_options": {},
"uri_prefix": null,
"userid": "user",
"virtual_host": "/"
},
"clock": "128604715",
"pid": 9580,
"pool": {
"max-concurrency": 72,
"max-tasks-per-child": "N/A",
"processes": [
9607,
9610,
9611,
9612,
9613,
9614,
9615,
9616,
9617,
9618,
9626,
9627,
9628,
9629,
9630,
9631,
9632,
9633,
9634,
9635,
9636,
9637,
9639,
9640,
9641,
9643,
9644,
9645,
9646,
9647,
9648,
9649,
9650,
9651,
9652,
9653,
9654,
9655,
9656,
9658,
9659,
9660,
9661,
9662,
9663,
9664,
9665,
9666,
9667,
9668,
9669,
9670,
9671,
9672,
9673,
9674,
9675,
9676,
9677,
9678,
9679,
9680,
9681,
9682,
9683,
9684,
9685,
9686,
9687,
9688,
9689,
9690
],
"put-guarded-by-semaphore": false,
"timeouts": [
0,
0
],
"writes": {
"all": "0.22, 0.19, 0.15, 0.07, 0.07, 0.11, 0.07, 0.07, 0.04",
"avg": "0.11",
"inqueues": {
"active": 0,
"total": 72
},
"raw": "6, 5, 4, 2, 2, 3, 2, 2, 1",
"strategy": "fair",
"total": 27
}
},
"prefetch_count": 72,
"rusage": {
"idrss": 0,
"inblock": 0,
"isrss": 0,
"ixrss": 0,
"majflt": 0,
"maxrss": 118236,
"minflt": 901798,
"msgrcv": 0,
"msgsnd": 0,
"nivcsw": 47351,
"nsignals": 0,
"nswap": 0,
"nvcsw": 228044,
"oublock": 4464,
"stime": 38.369105,
"utime": 305.13124
},
"total": {
"app.celery.start_task": 27
},
"uptime": 6107
}
The docs mention here how to reserve one task at a time - or only as many as you have concurrency:
Often users ask if disabling “prefetching of tasks” is possible, but what they really mean by that, is to have a worker only reserve as many tasks as there are worker processes (10 unacknowledged tasks for -c 10)
That’s possible, but not without also enabling late acknowledgment. Using this option over the default behavior means a task that’s already started executing will be retried in the event of a power failure or the worker instance being killed abruptly, so this also means the task must be idempotent ... You can enable this behavior by using the following configuration options:
task_acks_late = True
worker_prefetch_multiplier = 1
We use these parameters for run Celery (to run workers use "--prefetch-multiplier=1" and acks_late=True in code @celery.task(name=TASK_NAME, queue='auto_task', acks_late=True).
Celery may receive 40 tasks, but it will only work with 10-20 at the same time, and some tasks remain in reserved for a long time.