Fix/10096 worker fails to reconnect after redis failover#10151
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #10151 +/- ##
==========================================
+ Coverage 87.55% 87.77% +0.22%
==========================================
Files 153 153
Lines 19422 19432 +10
Branches 2234 2233 -1
==========================================
+ Hits 17005 17057 +52
+ Misses 2121 2076 -45
- Partials 296 299 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
e2ebfbb to
b58780c
Compare
auvipy
left a comment
There was a problem hiding this comment.
please check the failing tests
There was a problem hiding this comment.
Pull request overview
Fixes a regression where Celery workers using Redis Sentinel as the broker fail to recover after a Redis failover, by ensuring the event loop and prefork async pool properly clean up state on connection errors.
Changes:
- Update
asynloop()tohub.reset()only on error paths (preserving graceful shutdown timer behavior). - Fix
AsynPool.flush()to properly discard/cancel unaccepted jobs regardless ofsynack, and prevent infinite loops when writers target dead processes. - Add unit tests covering hub reset behavior and
flush()job/writer handling.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
celery/worker/loops.py |
Reset hub on exception to avoid stale fds/callbacks blocking reconnection. |
celery/concurrency/asynpool.py |
Fix cache cleanup and active-writer draining logic during flush on connection loss. |
t/unit/worker/test_loops.py |
Add tests asserting hub reset happens only on error, not on graceful/system-exit shutdown. |
t/unit/concurrency/test_prefork.py |
Add tests for flush() behavior across synack modes and writer/process liveness. |
9847585 to
296ec7e
Compare
a32dc5b to
ca8caba
Compare
|
I've fixed the unittest in 296ec7e |
|
FYI, I locally tested this patch against my external regression test case and everything appears to pass. |
* fix: trigger the hub reset on error
* fix: jobs stuck in cache when synack is disabled
* fix: prevent infinite loop when worker process is dead
* fix: pass the synack to pool
* fix: advance _write_ack generators in flush() instead of dropping them
* test: add missing coverage for flush() _write_ack and gen_not_started paths
* fix: sitch the synack to True
---------
Co-authored-by: Asif Saif Uddin {"Auvi":"অভি"} <auvipy@gmail.com>
* fix: trigger the hub reset on error
* fix: jobs stuck in cache when synack is disabled
* fix: prevent infinite loop when worker process is dead
* fix: pass the synack to pool
* fix: advance _write_ack generators in flush() instead of dropping them
* test: add missing coverage for flush() _write_ack and gen_not_started paths
* fix: sitch the synack to True
---------
Co-authored-by: Asif Saif Uddin {"Auvi":"অভি"} <auvipy@gmail.com>
* fix: trigger the hub reset on error
* fix: jobs stuck in cache when synack is disabled
* fix: prevent infinite loop when worker process is dead
* fix: pass the synack to pool
* fix: advance _write_ack generators in flush() instead of dropping them
* test: add missing coverage for flush() _write_ack and gen_not_started paths
* fix: sitch the synack to True
---------
Co-authored-by: Asif Saif Uddin {"Auvi":"অভি"} <auvipy@gmail.com>
Fix: Worker fails to reconnect after Redis failover (regression from #9986 and #7273)
Fixes #10096
Description
After upgrading to Celery 5.6.x, workers using Redis Sentinel as the broker fail to reconnect after a Redis failover. Running tasks appear permanently stuck in
activestate, no new tasks are consumed, and the worker becomes invisible tocelery inspect ping/active_queues.The root cause is a combination of two regressions introduced by #9986 and #7273 in
AsynPool.flush()andasynloop(). This PR fixes all three bugs while preserving the intended behavior of both original PRs.Bug 1 —
hub.reset()removed fromasynloop()error path (regression from #9986)PR #9986 removed the
hub.reset()call fromasynloop()to keep timers alive during graceful shutdown. However, this also removed the reset on the error path (e.g., broker connection loss), which means stale file descriptors and callbacks from the old connection remain registered in the hub. When the consumer restarts and re-registers with the event loop, these stale descriptors causeEBADFerrors or 100% CPU poll loops, preventing the worker from reconnecting.Fix: Wrap the event loop in
try/except Exceptionand callhub.reset()only on error. This preserves the #9986 fix because:hub.reset()is not called — timers keep firing while the pool drains, exactly as Fix: Broker heartbeats not sent during graceful shutdown #9986 intended.hub.reset()is called — stale fds and callbacks are cleaned up before the consumer restarts.WorkerShutdown/WorkerTerminateextendSystemExit(notException), so they pass through without triggering the reset.Bug 2 — Unaccepted jobs stuck in cache when
synack=False(regression from #7273)PR #7273 added logic in
AsynPool.flush()to clear the_cachewhen there are no active writers. However, the cleanup of individual unaccepted jobs was only done whensynack=True:When
synack=False(the default), unaccepted jobs were never removed from_cache. After a broker reconnection, these orphaned entries make the worker think it still has active tasks, blocking new task consumption (especially withworker_concurrency=1).Fix: Iterate over all unaccepted jobs regardless of
synack. Whensynack=True, calljob._cancel()(sends NACK). Whensynack=False, calljob.discard()(removes from cache; broker will redeliver). Usestuple()to safely copy the dict during iteration.Bug 3 — Infinite loop in
flush()when worker process is dead (latent bug in #7273)In the
while self._active_writersloop offlush(), when a writer generator had already started but its target process was dead:_flush_writer()was skipped (process not alive)job.discard()was called_active_writersThis caused the
whileloop to spin forever, hanging the worker on connection loss.Additionally, the original code had a shortcut
if not self._active_writers: self._cache.clear()that aggressively cleared the entire cache, including jobs that were already accepted and actively executing in worker processes.Fix:
self._active_writers.discard(gen)in theelsebranch, regardless of whether the process is alive or dead.self._cache.clear()shortcut — jobs should only be discarded individually based on their actual state.How these bugs interact (the #10096 scenario)
The issue reporter's environment: Redis Sentinel broker,
worker_concurrency=1,task_acks_late=False.asynloop()gets an exception from the broken connection.hub.reset()is never called → stale fds remain → consumer cannot cleanly re-register with the event loop → worker becomes invisible (inspect pingfails)._cache→ worker thinks it still has 1 active task → withconcurrency=1, no new tasks are accepted.flush()is called while a writer targets a dead process, the worker hangs in an infinite loop.All three fixes are needed for full recovery. The reporter confirmed that versions prior to 5.6.1 only needed the
asynpool.pyfix (Bugs 2+3), and from 5.6.1 onwards theloops.pyfix (Bug 1) became necessary as well — matching the timeline of #9986 being merged.Changes
celery/worker/loops.pytry/except Exceptionaround event loop; callhub.reset()on error onlycelery/concurrency/asynpool.pysynack=False; fix infinite loop by always removing dead-process writers from_active_writers; remove unsafe_cache.clear()shortcutt/unit/worker/test_loops.pyt/unit/concurrency/test_prefork.pyflush()behaviorTest coverage
asynloophub reset tests (t/unit/worker/test_loops.py)test_hub_reset_on_connection_errorhub.reset()called when poll raisessocket.errortest_hub_not_reset_on_graceful_shutdownhub.reset()not called on normal CLOSE transitiontest_hub_not_reset_on_worker_shutdownhub.reset()not called forWorkerShutdown(extendsSystemExit)test_hub_not_reset_on_worker_terminatehub.reset()not called forWorkerTerminate(extendsSystemExit)test_hub_reset_error_still_reraises_originalhub.reset()itself failsAsynPool.flush()tests (t/unit/concurrency/test_prefork.py)test_flush_no_synack_discards_unaccepted_jobsjob.discard()called for unaccepted jobs whensynack=Falsetest_flush_synack_cancels_unaccepted_jobsjob._cancel()called for unaccepted jobs whensynack=Truetest_flush_dead_process_discards_active_writer_active_writerswhen process is dead (no infinite loop)test_flush_alive_process_flushes_writer_flush_writer()called when process is still aliveBackwards compatibility
hub.reset()is now called on connection errors (restoring pre-Fix: Broker heartbeats not sent during graceful shutdown #9986 behavior for the error path only), and unaccepted jobs are properly cleaned up regardless ofsynacksetting.