Skip to content

Celery delayed delivery not bound when undeclared task queues #9960

@Izzette

Description

@Izzette

Checklist

  • I have verified that the issue exists against the main branch of Celery.
  • This has already been asked to the discussions forum first.
  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the main branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).
  • I have tried to reproduce the issue with pytest-celery and added the reproduction script below.

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the main branch of Celery.
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue on more than one Python version
    and/or implementation.
  • I have tried reproducing the issue on more than one message broker and/or
    result backend.
  • I have tried reproducing the issue on more than one version of the message
    broker and/or result backend.
  • I have tried reproducing the issue on more than one operating system.
  • I have tried reproducing the issue on more than one workers pool.
  • I have tried reproducing the issue with autoscaling, retries,
    ETA/Countdown & rate limits disabled.
  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies.

Related Issues and Possible Duplicates

Related Issues

  • None

Possible Duplicates

  • None

Environment & Settings

Celery version: 5.6.0b2 (recovery)

celery report Output:

$ poetry run celery -A celery_app report

software -> celery:5.6.0b2 (recovery) kombu:5.6.0rc2 py:3.13.5
            billiard:4.2.2 py-amqp:5.3.1
platform -> system:Darwin arch:64bit, Mach-O
            kernel version:24.6.0 imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:amqp results:disabled

deprecated_settings: None
broker_url: 'amqp://guest:********@localhost:5672//'
result_backend: None
task_queues: [<unbound Queue queue-a -> <unbound Exchange celery.topic(topic)> -> queue-a>,
 <unbound Queue queue-b -> <unbound Exchange celery.topic(topic)> -> queue-b>,
 <unbound Queue queue-c -> <unbound Exchange celery.topic(topic)> -> queue-c>]
task_routes:
    'celery_app.task_for_queue_a': {'queue': 'queue-a'},
    'celery_app.task_for_queue_c': {'queue': 'queue-c'}}

Steps to Reproduce

Required Dependencies

  • Minimal Python Version: 3.9+
  • Minimal Celery Version: 5.5.0b1+
  • Minimal Kombu Version: N/A
  • Minimal Broker Version: RabbitMQ supporting x-message-ttl (3.10+ for x-mesage-ttl on celery delayed delivery using quorum queues)
  • Minimal Result Backend Version: N/A
  • Minimal OS and/or Kernel Version: N/A or Unknown
  • Minimal Broker Client Version: N/A
  • Minimal Result Backend Client Version: N/A

Python Packages

pip freeze Output:

$ poetry run pip freeze
amqp==5.3.1
billiard==4.2.2
celery==5.6.0b2
click==8.3.0
click-didyoumean==0.3.1
click-plugins==1.1.1.2
click-repl==0.3.0
kombu==5.6.0rc2
packaging==25.0
prompt_toolkit==3.0.52
python-dateutil==2.9.0.post0
six==1.17.0
tzdata==2025.2
tzlocal==5.3.1
vine==5.1.0
wcwidth==0.2.14

Minimally Reproducible Test Case

Details

"""
Celery application configuration for reproducing native delayed delivery bug.
"""
from celery import Celery
from kombu import Exchange, Queue

app = Celery()

# Configure broker
app.conf.broker_url = 'amqp://guest:guest@localhost:5672//'
app.conf.result_backend = None

# Single exchange for all queues
default_exchange = Exchange('celery.topic', type='topic')

# Define multiple task queues with quorum type triggering native delayed
# delivery
app.conf.task_queues = [
    Queue('queue-a', exchange=default_exchange, routing_key='queue-a',
          queue_arguments={'x-queue-type': 'quorum'}),
    Queue('queue-b', exchange=default_exchange, routing_key='queue-b',
          queue_arguments={'x-queue-type': 'quorum'}),
    Queue('queue-c', exchange=default_exchange, routing_key='queue-c',
          queue_arguments={'x-queue-type': 'quorum'}),
]

# Task routing
app.conf.task_routes = {
    'celery_app.task_for_queue_a': {'queue': 'queue-a'},
    'celery_app.task_for_queue_c': {'queue': 'queue-c'},
}


@app.task
def task_for_queue_a(message):
    """Task that should be routed to queue-a."""
    print(f"[queue-a] Received: {message}")


@app.task
def task_for_queue_c(message):
    """Task that should be routed to queue-c."""
    print(f"[queue-c] Received: {message}")

Scenario:

  1. Start fresh RabbitMQ vhost with no queues
  2. Start worker-a consuming only from queue-a:
    celery -A app worker -Q queue-a
  3. Worker-a declares queue-a and tries to bind all queues
  4. Binding fails on queue-b (doesn't exist), stops before queue-c
  5. Start worker-c consuming only from queue-c:
    celery -A app worker -Q queue-c
  6. Worker-c fails to bind queue-b, preventing queue-c from being bound due to same issue as worker-a
  7. Send task_for_queue_c as an ETA task to queue-c it gets discarded after the ETA is reached as no binding exists

Expected Behavior

When workers start up and attempt to bind queues for native delayed delivery:

Each worker should bind all queues this worker is consuming to the delayed delivery exchange
If binding fails for a queue (e.g., because it doesn't exist yet), the worker should either:

  • Log the error for that specific queue. Continue attempting to bind the remaining queues. Collect all errors and report them at the end.
  • Raise and exception to crash the worker.

Actual Behavior

When workers start up and attempt to bind queues for native delayed delivery, the binding process stops at the first error:

  1. Worker attempts to bind queues in order
  2. When binding fails for a queue (e.g., queue doesn't exist), an exception is raised immediately
  3. The loop breaks and remaining queues are never attempted
  4. This leaves some queues without delayed delivery bindings

In the example scenario:

  • Worker-a successfully binds queue-a, fails on queue-b, and never attempts queue-c
  • Worker-c has the same issue and also never binds queue-c
  • ETA tasks sent to queue-c are silently discarded - they reach the delayed delivery exchange but have no binding to route them

Worker logs:

[2025-10-20 17:08:33,062: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2025-10-20 17:08:33,068: INFO/MainProcess] mingle: searching for neighbors
[2025-10-20 17:08:34,104: INFO/MainProcess] mingle: all alone
[2025-10-20 17:08:34,123: INFO/MainProcess] Global QoS is disabled. Prefetch count in now static.
[2025-10-20 17:08:34,263: ERROR/MainProcess] Failed to bind queue 'queue-b': Queue.bind: (404) NOT_FOUND - no queue 'queue-b' in vhost '/'
[2025-10-20 17:08:34,263: WARNING/MainProcess] Failed to bind queues for 'amqp://guest:guest@localhost:5672//': Queue.bind: (404) NOT_FOUND - no queue 'queue-b' in vhost '/'
[2025-10-20 17:08:34,265: WARNING/MainProcess] Failed to setup delayed delivery for 'amqp://guest:guest@localhost:5672//': Queue.bind: (404) NOT_FOUND - no queue 'queue-b' in vhost '/'
[2025-10-20 17:08:34,266: CRITICAL/MainProcess] Failed to setup delayed delivery for all broker URLs. Native delayed delivery will not be available.
[2025-10-20 17:08:34,274: INFO/MainProcess] worker-c@BM-003963 ready.
^C
worker: Hitting Ctrl+C again will initiate cold shutdown, terminating all running tasks!

worker: Warm shutdown (MainProcess)
Image

In celery/worker/consumer/delayed_delivery.py, the _bind_queues method raises immediately on the first exception, preventing subsequent queues from being bound:

for queue in queues:
try:
logger.debug("Binding queue %r to delayed delivery exchange", queue.name)
bind_queue_to_native_delayed_delivery_exchange(connection, queue)
except Exception as e:
logger.error(
"Failed to bind queue %r: %s",
queue.name, str(e)
)
raise

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions