-
-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Closed
Labels
Milestone
Description
Checklist
- I have verified that the issue exists against the
mainbranch of Celery. - This has already been asked to the discussions forum first.
- I have read the relevant section in the
contribution guide
on reporting bugs. - I have checked the issues list
for similar or identical bug reports. - I have checked the pull requests list
for existing proposed fixes. - I have checked the commit log
to find out if the bug was already fixed in the main branch. - I have included all related issues and possible duplicate issues
in this issue (If there are none, check this box anyway). - I have tried to reproduce the issue with pytest-celery and added the reproduction script below.
Mandatory Debugging Information
- I have included the output of
celery -A proj reportin the issue.
(if you are not able to do this, then at least specify the Celery
version affected). - I have verified that the issue exists against the
mainbranch of Celery. - I have included the contents of
pip freezein the issue. - I have included all the versions of all the external dependencies required
to reproduce this bug.
Optional Debugging Information
- I have tried reproducing the issue on more than one Python version
and/or implementation. - I have tried reproducing the issue on more than one message broker and/or
result backend. - I have tried reproducing the issue on more than one version of the message
broker and/or result backend. - I have tried reproducing the issue on more than one operating system.
- I have tried reproducing the issue on more than one workers pool.
- I have tried reproducing the issue with autoscaling, retries,
ETA/Countdown & rate limits disabled. - I have tried reproducing the issue after downgrading
and/or upgrading Celery and its dependencies.
Related Issues and Possible Duplicates
Related Issues
- None
Possible Duplicates
- None
Environment & Settings
Celery version: 5.6.0b2 (recovery)
celery report Output:
$ poetry run celery -A celery_app report
software -> celery:5.6.0b2 (recovery) kombu:5.6.0rc2 py:3.13.5
billiard:4.2.2 py-amqp:5.3.1
platform -> system:Darwin arch:64bit, Mach-O
kernel version:24.6.0 imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:amqp results:disabled
deprecated_settings: None
broker_url: 'amqp://guest:********@localhost:5672//'
result_backend: None
task_queues: [<unbound Queue queue-a -> <unbound Exchange celery.topic(topic)> -> queue-a>,
<unbound Queue queue-b -> <unbound Exchange celery.topic(topic)> -> queue-b>,
<unbound Queue queue-c -> <unbound Exchange celery.topic(topic)> -> queue-c>]
task_routes:
'celery_app.task_for_queue_a': {'queue': 'queue-a'},
'celery_app.task_for_queue_c': {'queue': 'queue-c'}}
Steps to Reproduce
Required Dependencies
- Minimal Python Version: 3.9+
- Minimal Celery Version: 5.5.0b1+
- Minimal Kombu Version: N/A
- Minimal Broker Version: RabbitMQ supporting x-message-ttl (3.10+ for x-mesage-ttl on celery delayed delivery using quorum queues)
- Minimal Result Backend Version: N/A
- Minimal OS and/or Kernel Version: N/A or Unknown
- Minimal Broker Client Version: N/A
- Minimal Result Backend Client Version: N/A
Python Packages
pip freeze Output:
$ poetry run pip freeze
amqp==5.3.1
billiard==4.2.2
celery==5.6.0b2
click==8.3.0
click-didyoumean==0.3.1
click-plugins==1.1.1.2
click-repl==0.3.0
kombu==5.6.0rc2
packaging==25.0
prompt_toolkit==3.0.52
python-dateutil==2.9.0.post0
six==1.17.0
tzdata==2025.2
tzlocal==5.3.1
vine==5.1.0
wcwidth==0.2.14Minimally Reproducible Test Case
Details
"""
Celery application configuration for reproducing native delayed delivery bug.
"""
from celery import Celery
from kombu import Exchange, Queue
app = Celery()
# Configure broker
app.conf.broker_url = 'amqp://guest:guest@localhost:5672//'
app.conf.result_backend = None
# Single exchange for all queues
default_exchange = Exchange('celery.topic', type='topic')
# Define multiple task queues with quorum type triggering native delayed
# delivery
app.conf.task_queues = [
Queue('queue-a', exchange=default_exchange, routing_key='queue-a',
queue_arguments={'x-queue-type': 'quorum'}),
Queue('queue-b', exchange=default_exchange, routing_key='queue-b',
queue_arguments={'x-queue-type': 'quorum'}),
Queue('queue-c', exchange=default_exchange, routing_key='queue-c',
queue_arguments={'x-queue-type': 'quorum'}),
]
# Task routing
app.conf.task_routes = {
'celery_app.task_for_queue_a': {'queue': 'queue-a'},
'celery_app.task_for_queue_c': {'queue': 'queue-c'},
}
@app.task
def task_for_queue_a(message):
"""Task that should be routed to queue-a."""
print(f"[queue-a] Received: {message}")
@app.task
def task_for_queue_c(message):
"""Task that should be routed to queue-c."""
print(f"[queue-c] Received: {message}")Scenario:
- Start fresh RabbitMQ vhost with no queues
- Start worker-a consuming only from
queue-a:
celery -A app worker -Q queue-a - Worker-a declares
queue-aand tries to bind all queues - Binding fails on
queue-b(doesn't exist), stops beforequeue-c - Start
worker-cconsuming only fromqueue-c:
celery -A app worker -Q queue-c - Worker-c fails to bind
queue-b, preventingqueue-cfrom being bound due to same issue as worker-a - Send
task_for_queue_cas an ETA task toqueue-cit gets discarded after the ETA is reached as no binding exists
Expected Behavior
When workers start up and attempt to bind queues for native delayed delivery:
Each worker should bind all queues this worker is consuming to the delayed delivery exchange
If binding fails for a queue (e.g., because it doesn't exist yet), the worker should either:
- Log the error for that specific queue. Continue attempting to bind the remaining queues. Collect all errors and report them at the end.
- Raise and exception to crash the worker.
Actual Behavior
When workers start up and attempt to bind queues for native delayed delivery, the binding process stops at the first error:
- Worker attempts to bind queues in order
- When binding fails for a queue (e.g., queue doesn't exist), an exception is raised immediately
- The loop breaks and remaining queues are never attempted
- This leaves some queues without delayed delivery bindings
In the example scenario:
- Worker-a successfully binds queue-a, fails on queue-b, and never attempts queue-c
- Worker-c has the same issue and also never binds queue-c
- ETA tasks sent to queue-c are silently discarded - they reach the delayed delivery exchange but have no binding to route them
Worker logs:
[2025-10-20 17:08:33,062: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2025-10-20 17:08:33,068: INFO/MainProcess] mingle: searching for neighbors
[2025-10-20 17:08:34,104: INFO/MainProcess] mingle: all alone
[2025-10-20 17:08:34,123: INFO/MainProcess] Global QoS is disabled. Prefetch count in now static.
[2025-10-20 17:08:34,263: ERROR/MainProcess] Failed to bind queue 'queue-b': Queue.bind: (404) NOT_FOUND - no queue 'queue-b' in vhost '/'
[2025-10-20 17:08:34,263: WARNING/MainProcess] Failed to bind queues for 'amqp://guest:guest@localhost:5672//': Queue.bind: (404) NOT_FOUND - no queue 'queue-b' in vhost '/'
[2025-10-20 17:08:34,265: WARNING/MainProcess] Failed to setup delayed delivery for 'amqp://guest:guest@localhost:5672//': Queue.bind: (404) NOT_FOUND - no queue 'queue-b' in vhost '/'
[2025-10-20 17:08:34,266: CRITICAL/MainProcess] Failed to setup delayed delivery for all broker URLs. Native delayed delivery will not be available.
[2025-10-20 17:08:34,274: INFO/MainProcess] worker-c@BM-003963 ready.
^C
worker: Hitting Ctrl+C again will initiate cold shutdown, terminating all running tasks!
worker: Warm shutdown (MainProcess)
In celery/worker/consumer/delayed_delivery.py, the _bind_queues method raises immediately on the first exception, preventing subsequent queues from being bound:
celery/celery/worker/consumer/delayed_delivery.py
Lines 160 to 169 in 1ad3bd4
| for queue in queues: | |
| try: | |
| logger.debug("Binding queue %r to delayed delivery exchange", queue.name) | |
| bind_queue_to_native_delayed_delivery_exchange(connection, queue) | |
| except Exception as e: | |
| logger.error( | |
| "Failed to bind queue %r: %s", | |
| queue.name, str(e) | |
| ) | |
| raise |
aureplop