Skip to content

Conversation

@Izzette
Copy link
Contributor

@Izzette Izzette commented Oct 20, 2025

Description

Current problem

When binding queues for native delayed delivery in AMQP, a failure in binding one queue prevents subsequent queues from being bound. This is particularly problematic when multiple heterogenous worker deployments are consuming from different queues, all needing delayed delivery setup.

Let's take for example an application with 3 task_queues (queue-a, queue-b, and queue-c) and two workers (worker-a and worker-c). The worker-a consumes from queue-a and worker-c consumes from queue-c. No worker is consuming from queue-b. The application is starting on a fresh RabbitMQ vhost with no queues or exchanges previously existing. worker-a starts up, it declares queue-a, the celery delayed delivery exchanges/queues, and it starts the delayed delivery binding process by calling DelayedDelivery._bind_queues. First it tries to bind queue-a to the delayed delivery exchange, it goes smoothly and the binding is setup for queue-a. Then worker-a proceeds to try to bind queue-b, because it doesn't exist (worker-a won't declare it as it is not a consumer of queue-b) an exception is raised. worker-a never tries to bind queue-c to the delayed delivery exchange as the loop broke binding queue-c.

Next, worker-c starts ups. It declares queue-c, the celery delayed delivery exchanges/queues, and it calls DelayedDelivery._bind_queues. First it tries to bind queue-a which works fine as it's already bound, but fails on queue-b for the same reason as worker-a failed to bind the queue to the delayed delivery exchange. queue-c is never bound by this worker.

Both workers finish their startup and keep consuming. They log the message Failed to bind queues for …. When ETA tasks are sent for queue-a, they trickle down through the delayed delivery exchanges until eventually being delivery to queue-a and processed by worker-a. However, when ETA tasks are sent for queue-c, after trickling through the delayed delivery exchange, no binding is found for *.queue-c and the tasks are discarded. Non-ETA tasks sent to queue-c are delivered just fine and processed by worker-c.

This can even happen when all queues have a worker, depending on the order in which the workers are started. If for example worker-b existed and was consuming from queue-b, but the startup order was worker-b, worker-c, and finally worker-a: than both queue-b and queue-c would be missing bindings. Once worker-b is restarted it would successfully bind queue-b to the delayed delivery exchanges. If worker-c is than restarted after worker-b has restarted, it would be able to bind queue-c as well.

I'm currently facing this issue in a production environment. My workaround is to manually declare all queues and restart the workers when new queues are added that don't yet have any workers.

Solution

Here the solution I'm proposing is to capture exceptions raised when binding each queue, and instead of immediately re-raising group all exceptions in an ExceptionGroup. This allows every queue to attempt to bind, with the errors still being raised after all queues have been attempted.

In order to allow ConnectionRefusedError and OSError to still be caught by retry_over_time, we need to raise immediately these errors as kombu.utlis.functional.retry_over_time is not using the except* clause.

See:

retry_over_time(
self._setup_delayed_delivery,
args=(c, broker_url),
catch=(ConnectionRefusedError, OSError),
errback=self._on_retry,
interval_start=RETRY_INTERVAL,
max_retries=MAX_RETRIES,

See: https://github.com/celery/kombu/blob/1bef09386e936be9757e713682bdf38bc1c0b737/kombu/utils/functional.py#L319-L319

Alternative solutions

Some other solutions could be considered, such as:

  1. Creating the queues in DelayedDelivery._bind_queues before attempting kombu.transport.native_delayed_delivery.bind_queue_to_native_delayed_delivery_exchange.
  2. Only iterating over the queues this worker is consuming (app.amqp._consume_from) in DelayedDelivery._bind_queues rather than all defined task_queues (app.amqp.queues).

See:

celery/celery/app/amqp.py

Lines 157 to 166 in c4e4bab

def select_add(self, queue, **kwargs):
"""Add new task queue that'll be consumed from.
The queue will be active even when a subset has been selected
using the :option:`celery worker -Q` option.
"""
q = self.add(queue, **kwargs)
if self._consume_from is not None:
self._consume_from[q.name] = q
return q

I chose this solution as even if iterating over only queues this worker is consuming, as long as the worker is expected to continue to run after encountering errors when binding the delayed delivery exchange, I believe it ought to at least attempt to bind all queues and not stop on the first error.


Fixes #9960

@Nusnus Nusnus force-pushed the native-delayed-binding-failures-keep-going branch from dbef707 to af31b45 Compare October 20, 2025 11:31
@Nusnus Nusnus requested review from Nusnus and Copilot October 20, 2025 11:31
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a critical issue where native delayed delivery queue binding would halt on the first error, preventing subsequent queues from being bound. The fix allows all queues to attempt binding while collecting errors into an ExceptionGroup, with special handling to preserve retry behavior for connection-related exceptions.

Key changes:

  • Queue binding now continues after individual failures instead of stopping at the first error
  • Non-retryable exceptions are collected and raised as an ExceptionGroup after all binding attempts
  • Connection-related exceptions (ConnectionRefusedError, OSError) are immediately re-raised to maintain existing retry logic

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
celery/worker/consumer/delayed_delivery.py Implements exception collection logic in _bind_queues to continue binding after failures; adds RETRIED_EXCEPTIONS constant for shared use between retry mechanism and binding logic
t/unit/worker/test_native_delayed_delivery.py Adds comprehensive test coverage for the new behavior: continued binding after failures, ExceptionGroup raising, and retry mechanism preservation

@auvipy auvipy self-requested a review October 20, 2025 11:52
Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you point to relevant issue which might be related to this fix?

@auvipy auvipy added this to the 5.6.0 milestone Oct 20, 2025
@codecov
Copy link

codecov bot commented Oct 20, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 78.68%. Comparing base (2f60642) to head (f486566).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #9959   +/-   ##
=======================================
  Coverage   78.67%   78.68%           
=======================================
  Files         153      153           
  Lines       19304    19311    +7     
  Branches     2212     2214    +2     
=======================================
+ Hits        15188    15195    +7     
  Misses       3817     3817           
  Partials      299      299           
Flag Coverage Δ
unittests 78.66% <100.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Izzette
Copy link
Contributor Author

Izzette commented Oct 20, 2025

can you point to relevant issue which might be related to this fix?

No, but I can create one if it would be useful for you.


It seems that ExceptionGroup was adding in 3.11. I'm not sure we need to keep the sub-exceptions, I'll just include their messages.

@auvipy
Copy link
Member

auvipy commented Oct 20, 2025

can you also check why python 3.9 and 3.10 builds are failing with following tests?

=================================== FAILURES ===================================
___ test_DelayedDelivery.test_bind_queues_raises_exception_group_on_failures ___

self = <t.unit.worker.test_native_delayed_delivery.test_DelayedDelivery object at 0x7f99dee542b0>
mock_bind =

  @patch('celery.worker.consumer.delayed_delivery.bind_queue_to_native_delayed_delivery_exchange')
  def test_bind_queues_raises_exception_group_on_failures(self, mock_bind):
      """Test that ExceptionGroup is raised with all binding failures."""
      consumer_mock = MagicMock()
      consumer_mock.app.conf.broker_native_delayed_delivery_queue_type = \
          'classic'
      consumer_mock.app.conf.broker_url = 'amqp://'
  
      # Create three queues
      queue1 = Queue('queue1', exchange=Exchange('exchange1', type='topic'))
      queue2 = Queue('queue2', exchange=Exchange('exchange2', type='topic'))
      queue3 = Queue('queue3', exchange=Exchange('exchange3', type='topic'))
  
      consumer_mock.app.amqp.queues = {
          'queue1': queue1,
          'queue2': queue2,
          'queue3': queue3,
      }
  
      # Make queue1 and queue3 fail with different errors
      def bind_side_effect(connection, queue):
          if queue.name == 'queue1':
              raise ValueError("Queue1 binding failed")
          elif queue.name == 'queue3':
              raise RuntimeError("Queue3 binding failed")
  
      mock_bind.side_effect = bind_side_effect
  
      delayed_delivery = DelayedDelivery(consumer_mock)
  
      # Should raise ExceptionGroup containing both exceptions
  with pytest.raises(ExceptionGroup) as exc_info:

E NameError: name 'ExceptionGroup' is not defined

t/unit/worker/test_native_delayed_delivery.py:386: NameError
=============================== warnings summary ===============================
/home/runner/_work/celery/celery/celery/worker/consumer/consumer.py:517: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to refrain from retrying connections on startup,
you should set broker_connection_retry_on_startup to False instead.
warnings.warn(

t/unit/worker/test_request.py::test_Request::test_from_message_invalid_kwargs
/home/runner/_work/celery/celery/celery/app/trace.py:705: RuntimeWarning: Exception raised outside body: InvalidTaskError('Task keyword arguments is not a mapping'):
Traceback (most recent call last):
File "/home/runner/_work/celery/celery/celery/app/trace.py", line 429, in trace_task
kwargs.items
AttributeError: 'str' object has no attribute 'items'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/runner/_work/celery/celery/celery/app/trace.py", line 431, in trace_task
    raise InvalidTaskError(
celery.exceptions.InvalidTaskError: Task keyword arguments is not a mapping

  warn(RuntimeWarning(

@Izzette
Copy link
Contributor Author

Izzette commented Oct 20, 2025

can you also check why python 3.9 and 3.10 builds are failing?

Yup! I'm already on it. It's because ExceptionGroup was adding in Python 3.11. I forgot to check this and was running locally with the latest version of Python. PEP654

Izzette and others added 6 commits October 20, 2025 16:41
* Continue to attempt to bind other queues after a native delayed
  delivery binding failure has occurred.
* Native delayed deliveries retries should bypass exception grouping and
  raise retriable exception immediately.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@Izzette Izzette force-pushed the native-delayed-binding-failures-keep-going branch from 96bee9b to 223decc Compare October 20, 2025 14:41
@Izzette
Copy link
Contributor Author

Izzette commented Oct 20, 2025

@auvipy I've created an issue here: #9960

@Izzette Izzette requested a review from auvipy October 20, 2025 15:29
Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also explore possibility to add integration test for this?

@Nusnus Nusnus requested a review from Copilot October 20, 2025 20:11
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

auvipy and others added 3 commits October 21, 2025 18:51
Co-authored-by: Isabelle COWAN-BERGMAN <Izzette@users.noreply.github.com>
Co-authored-by: Isabelle COWAN-BERGMAN <Izzette@users.noreply.github.com>
@Izzette
Copy link
Contributor Author

Izzette commented Oct 21, 2025

@auvipy:

can you also explore possibility to add integration test for this?

I've added an integration test which is able to reproduce part of the issue described in #9960 (I hope this works in the CI 🤞, it's working locally). This test starts the worker with an extra queue in task_queues and checks the bindings using the RabbitMQ management API. This requires updating the RabbitMQ image from latest to management (which is the same as latest but with the management plugin enabled by default).

I could in theory test delayed delivery in addition to the bindings, but as I see many of the integration tests are marked as flaky: I think keeping the scope of the test to the bindings created is probably wise.

@Izzette Izzette requested a review from auvipy October 21, 2025 15:31
@auvipy auvipy requested a review from Copilot October 22, 2025 15:46
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.

@auvipy

This comment was marked as resolved.

@Izzette
Copy link
Contributor Author

Izzette commented Oct 24, 2025

@auvipy:

can you please look into this failing test?

It's really strange, as it was passing in this prior commit, with the diff between the two being enabling the rabbitmq management plugin, the exception message for the ExceptionGroup, and the addition of an integration test. The error message is also awful strange: Exception("<class 't.integration.tasks.UnpickleableException'>(['foo'])") was raised, while we're expecting celery.utils.serialization.UnpickleableExceptionWrapper which has the base class of Exception and wraps a t.integration.tasks.UnpickleableException ... they look very similar.

The thing is, I get the same error on main despite having passed previously. Other PRs such as #9962 are also failing with the same error. I think it may be due to a difference in the available package versions, as the package versions aren't pinned to an exact version, but rather with an interval allowing higher versions to be installed.

If I compare the pip freeze from the passing run and the pip freeze from the failing run, I see this:

--- passing.txt	2025-10-24 10:26:21.889142303 +0200
+++ failing.txt	2025-10-24 10:26:25.857341881 +0200
@@ -12,2 +12,2 @@
-boto3==1.40.55
-botocore==1.40.55
+boto3==1.40.56
+botocore==1.40.56
@@ -17,2 +17,2 @@
-cassandra-driver==3.29.2
-celery @ file:///home/runner/_work/celery/celery/.tox/.tmp/package/1/celery-5.6.0b2-0.editable-py3-none-any.whl#sha256=14d56489ca97ae53d64c3e6288c7c7e2cdcf46b09fa188d32a8a82647ffe644b
+cassandra-driver==3.29.3
+celery @ file:///home/runner/_work/celery/celery/.tox/.tmp/package/1/celery-5.6.0b2-0.editable-py3-none-any.whl#sha256=444fa6a249a4a54018ea7342007f53c7d5a42de8238e819449cd1bbc246e2eb2
@@ -46 +46 @@
-geomet==0.2.1.post1
+geomet==1.1.0
@@ -68 +68 @@
-kombu @ git+https://github.com/celery/kombu.git@1bef09386e936be9757e713682bdf38bc1c0b737
+kombu @ git+https://github.com/celery/kombu.git@b8cf1f68a9af3d129e57be52ce48d3f85ecc9a80
@@ -145 +145 @@
-tblib==3.1.0
+tblib==3.2.0

The tblib (Serialization library for Exceptions and Tracebacks) looks suspect. If I pin tblib==3.1.0 in requirements/extras/tblib.txt on main, the tests pass again. I think if you re-run the integration tests on main, we'll install tblib 3.2.0 and the tests will fail. I'm not really familiar with this library nor this code written by @ask 15 years ago, so I'm not sure what is really to be fixed here. Maybe we should replace the requirement with tblib>=1.5.0,tblib<3.2.0;python_version>='3.8.0' in requirements/extras/tblib.txt.

In any case, this failure doesn't seem to be related to my PR and we should probably fix it in a different PR.

@auvipy
Copy link
Member

auvipy commented Oct 25, 2025

yes the failures doesn't seems to be related to your pr

@auvipy auvipy merged commit 7d501ca into celery:main Oct 25, 2025
107 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Celery delayed delivery not bound when undeclared task queues

2 participants