-
-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Add option to completely disable prefetching of tasks without using acks_late #9818
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a new configuration option worker_disable_prefetch to completely disable task prefetching in Celery workers without requiring acks_late. This addresses a limitation where tasks could be held in reserve by busy workers when other workers were available, particularly problematic in Kubernetes environments.
Key changes:
- Introduces a new configuration option that modifies the QoS
can_consumemethod to only fetch tasks when worker processes are available - Updates documentation to describe the new functionality and remove outdated limitations
- Adds comprehensive test coverage for the new feature including edge cases with autoscaling
Reviewed Changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| celery/app/defaults.py | Adds the new disable_prefetch configuration option |
| celery/worker/consumer/tasks.py | Implements the core logic to override can_consume when prefetch is disabled |
| docs/userguide/optimizing.rst | Updates optimization documentation to describe the new option |
| docs/userguide/configuration.rst | Documents the new worker_disable_prefetch setting |
| docs/faq.rst | Adds reference to the new disable prefetch functionality |
| t/unit/worker/test_consumer.py | Comprehensive test coverage for the disable prefetch feature |
| t/unit/worker/test_autoscale.py | Tests integration with autoscaling functionality |
| def can_consume(self): | ||
| limit = getattr(c.controller, "max_concurrency", None) | ||
| if limit is None: | ||
| limit = c.pool.num_processes | ||
| if len(state.reserved_requests) >= limit: | ||
| return False | ||
| return original_can_consume() | ||
|
|
||
| channel_qos.can_consume = MethodType(can_consume, channel_qos) | ||
|
|
Copilot
AI
Jul 19, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The nested function definition creates a closure that captures variables from the outer scope. Consider extracting this to a module-level function or class method to improve readability and testability.
| def can_consume(self): | |
| limit = getattr(c.controller, "max_concurrency", None) | |
| if limit is None: | |
| limit = c.pool.num_processes | |
| if len(state.reserved_requests) >= limit: | |
| return False | |
| return original_can_consume() | |
| channel_qos.can_consume = MethodType(can_consume, channel_qos) | |
| channel_qos.can_consume = MethodType( | |
| self._can_consume, channel_qos, c, state, original_can_consume | |
| ) |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #9818 +/- ##
==========================================
+ Coverage 78.62% 78.63% +0.01%
==========================================
Files 153 153
Lines 19199 19215 +16
Branches 2547 2552 +5
==========================================
+ Hits 15095 15110 +15
Misses 3811 3811
- Partials 293 294 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
| help_group="Worker Options", | ||
| help="Set custom prefetch multiplier value " | ||
| "for this worker instance.") | ||
| @click.option('--disable-prefetch', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this cli code also need unit test coverage... you can use pytest-click
| limit = getattr(c.controller, "max_concurrency", None) | ||
| if limit is None: | ||
| limit = c.pool.num_processes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| limit = getattr(c.controller, "max_concurrency", None) | |
| if limit is None: | |
| limit = c.pool.num_processes | |
| limit = getattr(c.controller, "max_concurrency", c.pool.num_processes) | |
nit
|
we got a new PR for this #9863 |
Description
This PR adds an option to completely disable prefetching without needing to use
acks_lateThe current situation in Celery is that if your tasks are not idempotent, or for whatever reason you need to acknowledge the task right away and not when it's being returned, you cannot completely disable prefetching. The most you can do is set the prefetch multiplier equal to 1.
Per the documentation:
This is problematic for many reasons and doesn't meet the needs for many usecases as outlined in:
Specifically for us, we're running celery in kubernetes and need tasks to be picked up and processed right away. We were running into situations where tasks were needlessly being held up, because a celery worker whose processes were already saturated with tasks were holding tasks in reserve (because of prefetch), when they could've just been redirected to another pod that was free.
Disabling prefetch makes it so that tasks are NOT reserved unless they're actively being worked on, preventing this issue for us.
Different approaches have been discussed for how to go about doing this -- the primary issue is that this behaviour actually stems from
kombu, specifically:kombu.transport.virtual.base.QoS.can_consume which limits the number of tasks that a worker will reserve in addition to tasks that have been ack-ed 1.
The reality is that this option should probably exist at the
kombulevel, and then upstream changes should be made inceleryto accommodate this. Frankly, we did not really have time for this, and this was a bug affecting us in production -- so we just needed a simple solution that worked.That is what the PR implements: a simple patch to
can_consumeon theceleryside that allows this functionality to work. I understand if it's not merged because of the approach, but I still want to post it here for others that just need something that works quickly.It (along with some other additions) are implemented on my personal fork of
celeryas well as the repository we are using in production at Gumloop.