Skip to content

Conversation

@rbehal
Copy link
Contributor

@rbehal rbehal commented Jul 17, 2025

Description

This PR adds an option to completely disable prefetching without needing to use acks_late

The current situation in Celery is that if your tasks are not idempotent, or for whatever reason you need to acknowledge the task right away and not when it's being returned, you cannot completely disable prefetching. The most you can do is set the prefetch multiplier equal to 1.

Per the documentation:

If you want to disable “prefetching of tasks” without using ack_late (because your tasks are not idempotent) that’s impossible right now and you can join the discussion here #7106

This is problematic for many reasons and doesn't meet the needs for many usecases as outlined in:

Specifically for us, we're running celery in kubernetes and need tasks to be picked up and processed right away. We were running into situations where tasks were needlessly being held up, because a celery worker whose processes were already saturated with tasks were holding tasks in reserve (because of prefetch), when they could've just been redirected to another pod that was free.

Disabling prefetch makes it so that tasks are NOT reserved unless they're actively being worked on, preventing this issue for us.

Different approaches have been discussed for how to go about doing this -- the primary issue is that this behaviour actually stems from kombu, specifically:

kombu.transport.virtual.base.QoS.can_consume which limits the number of tasks that a worker will reserve in addition to tasks that have been ack-ed 1.

The reality is that this option should probably exist at the kombu level, and then upstream changes should be made in celery to accommodate this. Frankly, we did not really have time for this, and this was a bug affecting us in production -- so we just needed a simple solution that worked.

That is what the PR implements: a simple patch to can_consume on the celery side that allows this functionality to work. I understand if it's not merged because of the approach, but I still want to post it here for others that just need something that works quickly.

It (along with some other additions) are implemented on my personal fork of celery as well as the repository we are using in production at Gumloop.

@auvipy auvipy requested review from auvipy and Copilot July 19, 2025 05:13
@auvipy auvipy added this to the 5.7.0 milestone Jul 19, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds a new configuration option worker_disable_prefetch to completely disable task prefetching in Celery workers without requiring acks_late. This addresses a limitation where tasks could be held in reserve by busy workers when other workers were available, particularly problematic in Kubernetes environments.

Key changes:

  • Introduces a new configuration option that modifies the QoS can_consume method to only fetch tasks when worker processes are available
  • Updates documentation to describe the new functionality and remove outdated limitations
  • Adds comprehensive test coverage for the new feature including edge cases with autoscaling

Reviewed Changes

Copilot reviewed 7 out of 8 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
celery/app/defaults.py Adds the new disable_prefetch configuration option
celery/worker/consumer/tasks.py Implements the core logic to override can_consume when prefetch is disabled
docs/userguide/optimizing.rst Updates optimization documentation to describe the new option
docs/userguide/configuration.rst Documents the new worker_disable_prefetch setting
docs/faq.rst Adds reference to the new disable prefetch functionality
t/unit/worker/test_consumer.py Comprehensive test coverage for the disable prefetch feature
t/unit/worker/test_autoscale.py Tests integration with autoscaling functionality

Comment on lines +58 to +67
def can_consume(self):
limit = getattr(c.controller, "max_concurrency", None)
if limit is None:
limit = c.pool.num_processes
if len(state.reserved_requests) >= limit:
return False
return original_can_consume()

channel_qos.can_consume = MethodType(can_consume, channel_qos)

Copy link

Copilot AI Jul 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The nested function definition creates a closure that captures variables from the outer scope. Consider extracting this to a module-level function or class method to improve readability and testability.

Suggested change
def can_consume(self):
limit = getattr(c.controller, "max_concurrency", None)
if limit is None:
limit = c.pool.num_processes
if len(state.reserved_requests) >= limit:
return False
return original_can_consume()
channel_qos.can_consume = MethodType(can_consume, channel_qos)
channel_qos.can_consume = MethodType(
self._can_consume, channel_qos, c, state, original_can_consume
)

Copilot uses AI. Check for mistakes.
@codecov
Copy link

codecov bot commented Jul 19, 2025

Codecov Report

❌ Patch coverage is 93.75000% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 78.63%. Comparing base (bf1c98b) to head (7ddf7a5).
⚠️ Report is 83 commits behind head on main.

Files with missing lines Patch % Lines
celery/bin/worker.py 66.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #9818      +/-   ##
==========================================
+ Coverage   78.62%   78.63%   +0.01%     
==========================================
  Files         153      153              
  Lines       19199    19215      +16     
  Branches     2547     2552       +5     
==========================================
+ Hits        15095    15110      +15     
  Misses       3811     3811              
- Partials      293      294       +1     
Flag Coverage Δ
unittests 78.61% <93.75%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

help_group="Worker Options",
help="Set custom prefetch multiplier value "
"for this worker instance.")
@click.option('--disable-prefetch',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this cli code also need unit test coverage... you can use pytest-click

Comment on lines +59 to +61
limit = getattr(c.controller, "max_concurrency", None)
if limit is None:
limit = c.pool.num_processes

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
limit = getattr(c.controller, "max_concurrency", None)
if limit is None:
limit = c.pool.num_processes
limit = getattr(c.controller, "max_concurrency", c.pool.num_processes)

nit

@auvipy
Copy link
Member

auvipy commented Aug 14, 2025

we got a new PR for this #9863

@auvipy auvipy closed this Aug 26, 2025
@auvipy auvipy modified the milestones: 5.7.0, 5.6.0 Aug 31, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants