-
-
Notifications
You must be signed in to change notification settings - Fork 4.9k
fix(worker): continue to attempt to bind other queues after a native delayed delivery binding failure has occurred #9959
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(worker): continue to attempt to bind other queues after a native delayed delivery binding failure has occurred #9959
Conversation
dbef707 to
af31b45
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a critical issue where native delayed delivery queue binding would halt on the first error, preventing subsequent queues from being bound. The fix allows all queues to attempt binding while collecting errors into an ExceptionGroup, with special handling to preserve retry behavior for connection-related exceptions.
Key changes:
- Queue binding now continues after individual failures instead of stopping at the first error
- Non-retryable exceptions are collected and raised as an
ExceptionGroupafter all binding attempts - Connection-related exceptions (
ConnectionRefusedError,OSError) are immediately re-raised to maintain existing retry logic
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
celery/worker/consumer/delayed_delivery.py |
Implements exception collection logic in _bind_queues to continue binding after failures; adds RETRIED_EXCEPTIONS constant for shared use between retry mechanism and binding logic |
t/unit/worker/test_native_delayed_delivery.py |
Adds comprehensive test coverage for the new behavior: continued binding after failures, ExceptionGroup raising, and retry mechanism preservation |
auvipy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you point to relevant issue which might be related to this fix?
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #9959 +/- ##
=======================================
Coverage 78.67% 78.68%
=======================================
Files 153 153
Lines 19304 19311 +7
Branches 2212 2214 +2
=======================================
+ Hits 15188 15195 +7
Misses 3817 3817
Partials 299 299
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
No, but I can create one if it would be useful for you. It seems that ExceptionGroup was adding in 3.11. I'm not sure we need to keep the sub-exceptions, I'll just include their messages. |
|
can you also check why python 3.9 and 3.10 builds are failing with following tests? =================================== FAILURES =================================== self = <t.unit.worker.test_native_delayed_delivery.test_DelayedDelivery object at 0x7f99dee542b0>
E NameError: name 'ExceptionGroup' is not defined t/unit/worker/test_native_delayed_delivery.py:386: NameError t/unit/worker/test_request.py::test_Request::test_from_message_invalid_kwargs |
Yup! I'm already on it. It's because ExceptionGroup was adding in Python 3.11. I forgot to check this and was running locally with the latest version of Python. PEP654 |
* Continue to attempt to bind other queues after a native delayed delivery binding failure has occurred.
for more information, see https://pre-commit.ci
* Native delayed deliveries retries should bypass exception grouping and raise retriable exception immediately.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
96bee9b to
223decc
Compare
auvipy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you also explore possibility to add integration test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
Co-authored-by: Isabelle COWAN-BERGMAN <Izzette@users.noreply.github.com>
Co-authored-by: Isabelle COWAN-BERGMAN <Izzette@users.noreply.github.com>
I've added an integration test which is able to reproduce part of the issue described in #9960 (I hope this works in the CI 🤞, it's working locally). This test starts the worker with an extra queue in task_queues and checks the bindings using the RabbitMQ management API. This requires updating the RabbitMQ image from I could in theory test delayed delivery in addition to the bindings, but as I see many of the integration tests are marked as flaky: I think keeping the scope of the test to the bindings created is probably wise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
This comment was marked as resolved.
This comment was marked as resolved.
It's really strange, as it was passing in this prior commit, with the diff between the two being enabling the rabbitmq management plugin, the exception message for the ExceptionGroup, and the addition of an integration test. The error message is also awful strange: The thing is, I get the same error on main despite having passed previously. Other PRs such as #9962 are also failing with the same error. I think it may be due to a difference in the available package versions, as the package versions aren't pinned to an exact version, but rather with an interval allowing higher versions to be installed. If I compare the --- passing.txt 2025-10-24 10:26:21.889142303 +0200
+++ failing.txt 2025-10-24 10:26:25.857341881 +0200
@@ -12,2 +12,2 @@
-boto3==1.40.55
-botocore==1.40.55
+boto3==1.40.56
+botocore==1.40.56
@@ -17,2 +17,2 @@
-cassandra-driver==3.29.2
-celery @ file:///home/runner/_work/celery/celery/.tox/.tmp/package/1/celery-5.6.0b2-0.editable-py3-none-any.whl#sha256=14d56489ca97ae53d64c3e6288c7c7e2cdcf46b09fa188d32a8a82647ffe644b
+cassandra-driver==3.29.3
+celery @ file:///home/runner/_work/celery/celery/.tox/.tmp/package/1/celery-5.6.0b2-0.editable-py3-none-any.whl#sha256=444fa6a249a4a54018ea7342007f53c7d5a42de8238e819449cd1bbc246e2eb2
@@ -46 +46 @@
-geomet==0.2.1.post1
+geomet==1.1.0
@@ -68 +68 @@
-kombu @ git+https://github.com/celery/kombu.git@1bef09386e936be9757e713682bdf38bc1c0b737
+kombu @ git+https://github.com/celery/kombu.git@b8cf1f68a9af3d129e57be52ce48d3f85ecc9a80
@@ -145 +145 @@
-tblib==3.1.0
+tblib==3.2.0The In any case, this failure doesn't seem to be related to my PR and we should probably fix it in a different PR. |
|
yes the failures doesn't seems to be related to your pr |
Description
Current problem
When binding queues for native delayed delivery in AMQP, a failure in binding one queue prevents subsequent queues from being bound. This is particularly problematic when multiple heterogenous worker deployments are consuming from different queues, all needing delayed delivery setup.
Let's take for example an application with 3
task_queues(queue-a,queue-b, andqueue-c) and two workers (worker-aandworker-c). Theworker-aconsumes fromqueue-aandworker-cconsumes fromqueue-c. No worker is consuming fromqueue-b. The application is starting on a fresh RabbitMQ vhost with no queues or exchanges previously existing.worker-astarts up, it declaresqueue-a, the celery delayed delivery exchanges/queues, and it starts the delayed delivery binding process by callingDelayedDelivery._bind_queues. First it tries to bindqueue-ato the delayed delivery exchange, it goes smoothly and the binding is setup forqueue-a. Thenworker-aproceeds to try to bindqueue-b, because it doesn't exist (worker-awon't declare it as it is not a consumer ofqueue-b) an exception is raised.worker-anever tries to bindqueue-cto the delayed delivery exchange as the loop broke bindingqueue-c.Next,
worker-cstarts ups. It declaresqueue-c, the celery delayed delivery exchanges/queues, and it callsDelayedDelivery._bind_queues. First it tries to bindqueue-awhich works fine as it's already bound, but fails onqueue-bfor the same reason asworker-afailed to bind the queue to the delayed delivery exchange.queue-cis never bound by this worker.Both workers finish their startup and keep consuming. They log the message
Failed to bind queues for …. When ETA tasks are sent forqueue-a, they trickle down through the delayed delivery exchanges until eventually being delivery toqueue-aand processed byworker-a. However, when ETA tasks are sent forqueue-c, after trickling through the delayed delivery exchange, no binding is found for*.queue-cand the tasks are discarded. Non-ETA tasks sent toqueue-care delivered just fine and processed byworker-c.This can even happen when all queues have a worker, depending on the order in which the workers are started. If for example
worker-bexisted and was consuming fromqueue-b, but the startup order wasworker-b,worker-c, and finallyworker-a: than bothqueue-bandqueue-cwould be missing bindings. Onceworker-bis restarted it would successfully bindqueue-bto the delayed delivery exchanges. Ifworker-cis than restarted afterworker-bhas restarted, it would be able to bindqueue-cas well.I'm currently facing this issue in a production environment. My workaround is to manually declare all queues and restart the workers when new queues are added that don't yet have any workers.
Solution
Here the solution I'm proposing is to capture exceptions raised when binding each queue, and instead of immediately re-raising group all exceptions in an ExceptionGroup. This allows every queue to attempt to bind, with the errors still being raised after all queues have been attempted.
In order to allow
ConnectionRefusedErrorandOSErrorto still be caught byretry_over_time, we need to raise immediately these errors askombu.utlis.functional.retry_over_timeis not using theexcept*clause.See:
celery/celery/worker/consumer/delayed_delivery.py
Lines 84 to 90 in c4e4bab
See: https://github.com/celery/kombu/blob/1bef09386e936be9757e713682bdf38bc1c0b737/kombu/utils/functional.py#L319-L319
Alternative solutions
Some other solutions could be considered, such as:
DelayedDelivery._bind_queuesbefore attemptingkombu.transport.native_delayed_delivery.bind_queue_to_native_delayed_delivery_exchange.app.amqp._consume_from) inDelayedDelivery._bind_queuesrather than all definedtask_queues(app.amqp.queues).See:
celery/celery/app/amqp.py
Lines 157 to 166 in c4e4bab
I chose this solution as even if iterating over only queues this worker is consuming, as long as the worker is expected to continue to run after encountering errors when binding the delayed delivery exchange, I believe it ought to at least attempt to bind all queues and not stop on the first error.
Fixes #9960