Skip to content

Fix/10096 worker fails to reconnect after redis failover#10151

Merged
auvipy merged 10 commits intocelery:mainfrom
ChickenBenny:fix/10096-worker-fails-to-reconnect-after-redis-failover
Mar 1, 2026
Merged

Fix/10096 worker fails to reconnect after redis failover#10151
auvipy merged 10 commits intocelery:mainfrom
ChickenBenny:fix/10096-worker-fails-to-reconnect-after-redis-failover

Conversation

@ChickenBenny
Copy link
Contributor

Fix: Worker fails to reconnect after Redis failover (regression from #9986 and #7273)

Fixes #10096

Description

After upgrading to Celery 5.6.x, workers using Redis Sentinel as the broker fail to reconnect after a Redis failover. Running tasks appear permanently stuck in active state, no new tasks are consumed, and the worker becomes invisible to celery inspect ping / active_queues.

The root cause is a combination of two regressions introduced by #9986 and #7273 in AsynPool.flush() and asynloop(). This PR fixes all three bugs while preserving the intended behavior of both original PRs.

Bug 1 — hub.reset() removed from asynloop() error path (regression from #9986)

PR #9986 removed the hub.reset() call from asynloop() to keep timers alive during graceful shutdown. However, this also removed the reset on the error path (e.g., broker connection loss), which means stale file descriptors and callbacks from the old connection remain registered in the hub. When the consumer restarts and re-registers with the event loop, these stale descriptors cause EBADF errors or 100% CPU poll loops, preventing the worker from reconnecting.

Fix: Wrap the event loop in try/except Exception and call hub.reset() only on error. This preserves the #9986 fix because:

  • On graceful shutdown, the loop exits normally (no exception), so hub.reset() is not called — timers keep firing while the pool drains, exactly as Fix: Broker heartbeats not sent during graceful shutdown #9986 intended.
  • On error (connection loss, etc.), hub.reset() is called — stale fds and callbacks are cleaned up before the consumer restarts.
  • WorkerShutdown / WorkerTerminate extend SystemExit (not Exception), so they pass through without triggering the reset.

Note: The issue reporter's patch used finally: (unconditional reset), which would re-break #9986 / #5998. Using except Exception: is the correct approach — it targets only error-driven reconnection while leaving graceful shutdown intact.

Bug 2 — Unaccepted jobs stuck in cache when synack=False (regression from #7273)

PR #7273 added logic in AsynPool.flush() to clear the _cache when there are no active writers. However, the cleanup of individual unaccepted jobs was only done when synack=True:

# Before (only handles synack=True)
if self.synack:
    for job in self._cache.values():
        if not job._accepted:
            job._cancel()

When synack=False (the default), unaccepted jobs were never removed from _cache. After a broker reconnection, these orphaned entries make the worker think it still has active tasks, blocking new task consumption (especially with worker_concurrency=1).

Fix: Iterate over all unaccepted jobs regardless of synack. When synack=True, call job._cancel() (sends NACK). When synack=False, call job.discard() (removes from cache; broker will redeliver). Uses tuple() to safely copy the dict during iteration.

Bug 3 — Infinite loop in flush() when worker process is dead (latent bug in #7273)

In the while self._active_writers loop of flush(), when a writer generator had already started but its target process was dead:

  1. _flush_writer() was skipped (process not alive)
  2. job.discard() was called
  3. But the generator was never removed from _active_writers

This caused the while loop to spin forever, hanging the worker on connection loss.

Additionally, the original code had a shortcut if not self._active_writers: self._cache.clear() that aggressively cleared the entire cache, including jobs that were already accepted and actively executing in worker processes.

Fix:

  • Always call self._active_writers.discard(gen) in the else branch, regardless of whether the process is alive or dead.
  • Remove the self._cache.clear() shortcut — jobs should only be discarded individually based on their actual state.
  • When a process is dead, explicitly discard the job from cache since it will never complete.

How these bugs interact (the #10096 scenario)

The issue reporter's environment: Redis Sentinel broker, worker_concurrency=1, task_acks_late=False.

  1. A task is running when Redis fails over → connection to old master is lost.
  2. asynloop() gets an exception from the broken connection.
  3. Without Bug 1 fix: hub.reset() is never called → stale fds remain → consumer cannot cleanly re-register with the event loop → worker becomes invisible (inspect ping fails).
  4. Without Bug 2 fix: The in-flight task completes, but its job entry remains in _cache → worker thinks it still has 1 active task → with concurrency=1, no new tasks are accepted.
  5. Without Bug 3 fix: If flush() is called while a writer targets a dead process, the worker hangs in an infinite loop.

All three fixes are needed for full recovery. The reporter confirmed that versions prior to 5.6.1 only needed the asynpool.py fix (Bugs 2+3), and from 5.6.1 onwards the loops.py fix (Bug 1) became necessary as well — matching the timeline of #9986 being merged.

Changes

File Change
celery/worker/loops.py Add try/except Exception around event loop; call hub.reset() on error only
celery/concurrency/asynpool.py Discard unaccepted jobs when synack=False; fix infinite loop by always removing dead-process writers from _active_writers; remove unsafe _cache.clear() shortcut
t/unit/worker/test_loops.py 5 new tests for hub reset behavior
t/unit/concurrency/test_prefork.py 4 new tests for flush() behavior

Test coverage

asynloop hub reset tests (t/unit/worker/test_loops.py)

Test Verifies
test_hub_reset_on_connection_error hub.reset() called when poll raises socket.error
test_hub_not_reset_on_graceful_shutdown hub.reset() not called on normal CLOSE transition
test_hub_not_reset_on_worker_shutdown hub.reset() not called for WorkerShutdown (extends SystemExit)
test_hub_not_reset_on_worker_terminate hub.reset() not called for WorkerTerminate (extends SystemExit)
test_hub_reset_error_still_reraises_original Original exception propagates even if hub.reset() itself fails

AsynPool.flush() tests (t/unit/concurrency/test_prefork.py)

Test Verifies
test_flush_no_synack_discards_unaccepted_jobs job.discard() called for unaccepted jobs when synack=False
test_flush_synack_cancels_unaccepted_jobs job._cancel() called for unaccepted jobs when synack=True
test_flush_dead_process_discards_active_writer Generator removed from _active_writers when process is dead (no infinite loop)
test_flush_alive_process_flushes_writer _flush_writer() called when process is still alive

Backwards compatibility

@codecov
Copy link

codecov bot commented Feb 28, 2026

Codecov Report

❌ Patch coverage is 95.12195% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 87.77%. Comparing base (40c2349) to head (fd4a652).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
celery/concurrency/asynpool.py 92.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #10151      +/-   ##
==========================================
+ Coverage   87.55%   87.77%   +0.22%     
==========================================
  Files         153      153              
  Lines       19422    19432      +10     
  Branches     2234     2233       -1     
==========================================
+ Hits        17005    17057      +52     
+ Misses       2121     2076      -45     
- Partials      296      299       +3     
Flag Coverage Δ
unittests 87.76% <95.12%> (+0.22%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@ChickenBenny ChickenBenny force-pushed the fix/10096-worker-fails-to-reconnect-after-redis-failover branch from e2ebfbb to b58780c Compare February 28, 2026 05:38
@auvipy auvipy requested a review from Copilot February 28, 2026 09:30
Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please check the failing tests

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes a regression where Celery workers using Redis Sentinel as the broker fail to recover after a Redis failover, by ensuring the event loop and prefork async pool properly clean up state on connection errors.

Changes:

  • Update asynloop() to hub.reset() only on error paths (preserving graceful shutdown timer behavior).
  • Fix AsynPool.flush() to properly discard/cancel unaccepted jobs regardless of synack, and prevent infinite loops when writers target dead processes.
  • Add unit tests covering hub reset behavior and flush() job/writer handling.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

File Description
celery/worker/loops.py Reset hub on exception to avoid stale fds/callbacks blocking reconnection.
celery/concurrency/asynpool.py Fix cache cleanup and active-writer draining logic during flush on connection loss.
t/unit/worker/test_loops.py Add tests asserting hub reset happens only on error, not on graceful/system-exit shutdown.
t/unit/concurrency/test_prefork.py Add tests for flush() behavior across synack modes and writer/process liveness.

@ChickenBenny ChickenBenny force-pushed the fix/10096-worker-fails-to-reconnect-after-redis-failover branch from 9847585 to 296ec7e Compare February 28, 2026 11:58
@ChickenBenny ChickenBenny force-pushed the fix/10096-worker-fails-to-reconnect-after-redis-failover branch from a32dc5b to ca8caba Compare February 28, 2026 12:38
@ChickenBenny
Copy link
Contributor Author

I've fixed the unittest in 296ec7e

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

@bdrosen96
Copy link

FYI, I locally tested this patch against my external regression test case and everything appears to pass.

@auvipy auvipy added this to the 5.7.0 milestone Mar 1, 2026
@auvipy auvipy merged commit 2b3c6fa into celery:main Mar 1, 2026
320 checks passed
Nusnus pushed a commit that referenced this pull request Mar 26, 2026
* fix: trigger the hub reset on error

* fix: jobs stuck in cache when synack is disabled

* fix: prevent infinite loop when worker process is dead

* fix: pass the synack to pool

* fix: advance _write_ack generators in flush() instead of dropping them

* test: add missing coverage for flush() _write_ack and gen_not_started paths

* fix: sitch the synack to True

---------

Co-authored-by: Asif Saif Uddin {"Auvi":"অভি"} <auvipy@gmail.com>
Nusnus pushed a commit that referenced this pull request Mar 26, 2026
* fix: trigger the hub reset on error

* fix: jobs stuck in cache when synack is disabled

* fix: prevent infinite loop when worker process is dead

* fix: pass the synack to pool

* fix: advance _write_ack generators in flush() instead of dropping them

* test: add missing coverage for flush() _write_ack and gen_not_started paths

* fix: sitch the synack to True

---------

Co-authored-by: Asif Saif Uddin {"Auvi":"অভি"} <auvipy@gmail.com>
Nusnus pushed a commit that referenced this pull request Mar 26, 2026
* fix: trigger the hub reset on error

* fix: jobs stuck in cache when synack is disabled

* fix: prevent infinite loop when worker process is dead

* fix: pass the synack to pool

* fix: advance _write_ack generators in flush() instead of dropping them

* test: add missing coverage for flush() _write_ack and gen_not_started paths

* fix: sitch the synack to True

---------

Co-authored-by: Asif Saif Uddin {"Auvi":"অভি"} <auvipy@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Issues reconnecting after redis failover

4 participants