Skip to content

Issues reconnecting after redis failover #10096

@bdrosen96

Description

@bdrosen96

Checklist

  • I have verified that the issue exists against the main branch of Celery.
  • This has already been asked to the discussions forum first.
  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the main branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).
  • I have tried to reproduce the issue with pytest-celery and added the reproduction script below.

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the main branch of Celery.
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue on more than one Python version
    and/or implementation.
  • I have tried reproducing the issue on more than one message broker and/or
    result backend.
  • I have tried reproducing the issue on more than one version of the message
    broker and/or result backend.
  • I have tried reproducing the issue on more than one operating system.
  • I have tried reproducing the issue on more than one workers pool.
  • I have tried reproducing the issue with autoscaling, retries,
    ETA/Countdown & rate limits disabled.
  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies.

Related Issues and Possible Duplicates

Related Issues

  • None

Possible Duplicates

  • None

Environment & Settings

5.6.2:

software -> celery:5.6.2 (recovery) kombu:5.6.2 py:3.12.11 billiard:4.2.1 redis:6.0.0 platform -> system:Linux arch:64bit, ELF kernel version:6.5.0-26-generic imp:CPython loader -> celery.loaders.app.AppLoader settings -> transport:redis-sentinel results:mongodb-replica

deprecated_settings: None
task_queues:
(<unbound Queue default -> <unbound Exchange ''(direct)> -> low.#>,)
CELERY_DEFAULT_EXCHANGE: 'default'
worker_consumer: 'ds.worker.CustomConsumer'

Output:

Steps to Reproduce

When running celery with redis sentinel as the broker, If I submit a job that is running and then do a redis failover the already running job completes but no new jobs will get run and the in progress job shows up as active even after completion. THis test case worked fine in 5.2.7 but stopped working after I updated to the lastest (5.6.2). There appears to be two issue that led to this change.

#7273
#9986

Applying the following diff to 5.6.2 restored the original behavior

--- celery/worker/loops.py	2025-12-29 14:07:33.000000000 -0500
+++ celery/worker/loops.py	2025-07-23 17:49:42.000000000 -0400
@@ -81,21 +81,28 @@
     hub.propagate_errors = errors
     loop = hub.create_loop()
 
-    while blueprint.state == RUN and obj.connection:
-        state.maybe_shutdown()
-        if heartbeat_error[0] is not None:
-            raise heartbeat_error[0]
+    try:
+        while blueprint.state == RUN and obj.connection:
+            state.maybe_shutdown()
+            if heartbeat_error[0] is not None:
+                raise heartbeat_error[0]
 
-        # We only update QoS when there's no more messages to read.
-        # This groups together qos calls, and makes sure that remote
-        # control commands will be prioritized over task messages.
-        if qos.prev != qos.value:
-            update_qos()
+            # We only update QoS when there's no more messages to read.
+            # This groups together qos calls, and makes sure that remote
+            # control commands will be prioritized over task messages.
+            if qos.prev != qos.value:
+                update_qos()
 
+            try:
+                next(loop)
+            except StopIteration:
+                loop = hub.create_loop()
+    finally:
         try:
-            next(loop)
-        except StopIteration:
-            loop = hub.create_loop()
+            hub.reset()
+        except Exception as exc:  # pylint: disable=broad-except
+            logger.exception(
+                'Error cleaning up after event loop: %r', exc)
 
 
 def synloop(obj, connection, consumer, blueprint, hub, qos,
--- celery/concurrency/asynpool.py.orig	2026-02-05 14:16:39.953652467 -0500
+++ celery/concurrency/asynpool.py	2026-02-05 14:18:09.955363218 -0500
@@ -1000,10 +1000,12 @@
             return
         # cancel all tasks that haven't been accepted so that NACK is sent
         # if synack is enabled.
-        if self.synack:
-            for job in self._cache.values():
-                if not job._accepted:
+        for job in tuple(self._cache.values()):
+            if not job._accepted:
+                if self.synack:
                     job._cancel()
+                else:
+                    job.discard()
 
         # clear the outgoing buffer as the tasks will be redelivered by
         # the broker anyway.
@@ -1028,36 +1030,32 @@
                     if writer is not None:
                         owned_by[writer] = job
 
-                if not self._active_writers:
-                    self._cache.clear()
-                else:
-                    while self._active_writers:
-                        writers = list(self._active_writers)
-                        for gen in writers:
-                            if (gen.__name__ == '_write_job' and
-                                    gen_not_started(gen)):
-                                # hasn't started writing the job so can
-                                # discard the task, but we must also remove
-                                # it from the Pool._cache.
-                                try:
-                                    job = owned_by[gen]
-                                except KeyError:
-                                    pass
-                                else:
-                                    # removes from Pool._cache
-                                    job.discard()
-                                self._active_writers.discard(gen)
+                while self._active_writers:
+                    writers = list(self._active_writers)
+                    for gen in writers:
+                        if (gen.__name__ == '_write_job' and
+                                gen_not_started(gen)):
+                            # hasn't started writing the job so can
+                            # discard the task, but we must also remove
+                            # it from the Pool._cache.
+                            try:
+                                job = owned_by[gen]
+                            except KeyError:
+                                pass
+                            else:
+                                # removes from Pool._cache
+                                job.discard()
+                            self._active_writers.discard(gen)
+                        else:
+                            try:
+                                job = owned_by[gen]
+                            except KeyError:
+                                pass
                             else:
-                                try:
-                                    job = owned_by[gen]
-                                except KeyError:
-                                    pass
-                                else:
-                                    job_proc = job._write_to
-                                    if job_proc._is_alive():
-                                        self._flush_writer(job_proc, gen)
+                                job_proc = job._write_to
+                                if job_proc._is_alive():
+                                    self._flush_writer(job_proc, gen)
 
-                                    job.discard()
                     # workers may have exited in the meantime.
                     self.maintain_pool()
                     sleep(next(intervals))  # don't busyloop

In loops.py I think the issue is the loss of the call to hub.reset()
In asynpool.py I think the issue has to do with changes in calls to either job.discard or job._cancel() . This may be due to synack always being False, the extra call to discard after _flush_writer or something else. Note that for some versions prior to 5.6.1 I only needed the asynpool.py change to restore behavior to expected behavior and from 5.6.1 onwards I needed the second fix as well. I think there are two distinct issues here:

1 The worker has a concurrency of 1 so when the job appears to be still active (celery inspect active), it cannot get get a new job to run. (versions prior to 5.6.1 have this issue but the worker is still showing up via celery inspect ping or active_queues)
2 Versions after 5.6.1 don't see the worker showing up at all in ping or active_queues, likely due to not calling hub.reset()

This is with

broker_connection_retry = True
worker_concurrency = 1
worker_prefetch_multiplier = 1
task_track_started = True
task_acks_late = False

Note that all dependencies and config settings are the same with 5.2.7 as weill 5.6.2 except that I added exceptiongroup to get 5.6.0 working prior to the import being made conditional.

Required Dependencies

  • Minimal Python Version: N/A or Unknown
  • Minimal Celery Version: N/A or Unknown
  • Minimal Kombu Version: N/A or Unknown
  • Minimal Broker Version: N/A or Unknown
  • Minimal Result Backend Version: N/A or Unknown
  • Minimal OS and/or Kernel Version: N/A or Unknown
  • Minimal Broker Client Version: N/A or Unknown
  • Minimal Result Backend Client Version: N/A or Unknown

Python Packages

aiobotocore==2.12.4 aioftp==0.18.1 aiohappyeyeballs==2.6.1 aiohttp==3.13.3.post1+dr aioitertools==0.11.0 aiosignal==1.2.0 alembic==1.13.3 amqp==5.3.1 async-timeout==4.0.2 atomicwrites==1.4.0 attrs==19.3.0 Authlib==1.6.6 azure-core==1.38.0 azure-identity==1.24.0 azure-storage-blob==12.19.0 azure-storage-file-datalake==12.14.0 billiard==4.2.1 boto==2.48.0.post4+dr boto3==1.34.106 botocore==1.34.106 cached-property==1.5.2 cachetools==3.1.1.post1+dr celery==5.6.2.post4+dr certifi==2024.8.30 cffi==1.16.0 chardet==2.3.0 charset-normalizer==2.1.0 click==8.1.2 click-didyoumean==0.3.0 click-plugins==1.1.1 click-repl==0.2.0 cligj==0.7.2 colorama==0.3.9.post1+dr coverage==7.6.9 cramjam==2.11.0 cryptography==44.0.2 Cython==0.29.36.post1+dr databricks-sdk==0.19.0 databricks-sql-connector==4.2.4 datarobot-errors==0.109.post1+dr datarobot-oss-gmp==6.2.1.post2+dr datarobot-oss-java-jdk11==1.11.0.25.post1+dr datarobot-oss-libgcc==11.2.0.post5+dr datarobot-oss-libgfortran==11.2.0.post5+dr datarobot-oss-mfpr==4.1.0.post2+dr datarobot-oss-mpc==1.2.1.post2+dr datarobot-oss-readline==8.1.post4+dr db-dtypes==1.1.1 decorator==4.4.1 dnspython==2.7.0 docker==7.1.0 docx2txt==0.8 dr-jdbc==1.5.107.post1+dr dr-secrets==1.8.0 drfaster==9.0.0.post4+dr drprotobuf-browser-service==0.2.0 dss.extensions.csv==0.0.6 dss.extensions.duplicates==0.0.6 dss.extensions.meminfo==0.0.6 et_xmlfile==2.0.0 exceptiongroup==1.3.1 fiona==1.10.0 flake8==2.5.1 frozenlist==1.5.0 geopandas==1.1.2 google-api-core==2.29.0 google-auth==2.38.0 google-cloud-bigquery==3.17.2 google-cloud-bigquery-storage==2.29.0 google-cloud-core==2.4.1 google-cloud-storage==2.19.0 google-crc32c==1.3.0 google-resumable-media==2.7.2 googleapis-common-protos==1.72.0 greenlet==3.1.1 grpcio==1.76.0 grpcio-reflection==1.59.3 grpcio-status==1.76.0 gssapi==1.9.0.post1+dr idna==3.7 iniconfig==2.3.0 injections==0.2.1 IPy==1.1 isodate==0.6.1 janus==1.0.0.post2+dr jmespath==0.9.4 jpype1==1.5.1 kazoo==2.6.1 kombu==5.6.2.post1+dr langid==1.1.6.post6+dr logstash-formatter==0.5.8.post3+dr lz4==4.3.3 Mako==1.3.5 Markdown==3.4.3 MarkupSafe==2.0.1 mccabe==0.3.1 mmh3==5.0.1 motor==3.6.0 msal==1.33.0 msal-extensions==1.2.0 msrest==0.7.1 multidict==6.6.3 nest-asyncio==1.5.6 numexpr==2.10.2 numpy==1.26.4 oauthlib==3.2.2 openpyxl==3.1.2 packaging==24.0 pandas==2.2.3.post4+dr pdfium-bindings==0.0.17.post4+dr pep8==1.7.1 pillow==10.4.0 pluggy==1.5.0 portalocker==1.7.1 prompt_toolkit==3.0.52 propcache==0.3.2 proto-plus==1.26.1 protobuf==6.33.5 psutil==6.1.1 pyarrow==19.0.1 pyasn1==0.4.8 pyasn1-modules==0.2.7 pybreaker==1.4.1 pycparser==2.19.post3+dr pycryptodome==3.21.0 pydantic==1.10.22 pyflakes==1.0.0.post1+dr PyICU==2.14.post2+dr PyJWT==2.10.1 pymongo==4.9 pyogrio==0.12.1 pyOpenSSL==25.0.0 pyparsing==3.0.9 pyproj==3.7.0 pytest==8.3.4 pytest-cov==6.0.0 pytest-rerunfailures==15.0 pytest-timeout==2.3.1 python-dateutil==2.9.0 python-magic==0.4.27 python-snappy==0.7.3 pytz==2022.4 PyYAML==6.0.2 quantum-env==3.2.0 quantum-native-atlas==3.8.4.post2+gf11 quantum-native-bzip2==1.0.8.post5+dr quantum-native-cyrus-sasl==2.1.27.post5+openssl3 quantum-native-expat==2.4.9.post1+dr quantum-native-icu4c==65.1.post2+dr quantum-native-isl==0.24.post1+dr quantum-native-krb5==1.15.3.post1+openssl3 quantum-native-libffi==3.4.2.post2+dr quantum-native-lzma==5.2.5.post2+dr quantum-native-mpdecimal==2.5.1.post3+dr quantum-native-ncurses==6.3.post1+dr quantum-native-openldap==2.4.57.post1+openssl3 quantum-native-openssl==3.2.6.post1+dr quantum-native-pdfium==120.0.6070.0.post1+dr quantum-native-protobuf==3.19.6.post2+dr quantum-native-python3==3.12.11.post1+dr quantum-native-sqlite3==3.37.2.post4+dr quantum-native-zlib==1.2.11.post13+dr quantum-native-zstd==1.5.2.post1+dr redis==6.0.0 redis-entraid==1.1.0 remote-pdb==1.2.0 requests==2.32.4 requests-oauthlib==1.3.1 retrying==1.3.3.post1+dr rsa==4.7 s3transfer==0.10.4 sas7bdat==2.0.15.post9+dss scipy==1.14.1 service-library==0.8.2 setuptools==80.10.2 shapely==2.0.6 six==1.17.0 SQLAlchemy==1.4.51 sqlparse==0.5.0 thrift==0.16.0.post1+dr trafaret==0.6.1.post3+dr types-cryptography==3.3.23.2 typing_extensions==4.12.2 tzdata==2025.3 tzlocal==5.3.1 urllib3==2.6.3 vine==5.1.0 watchdog==4.0.2 wcwidth==0.2.13 wrapt==1.17.0 xlrd==1.2.0.post1+dr yarl==1.17.2 zake==0.2.2 Output:

Other Dependencies

Details

N/A

Minimally Reproducible Test Case

Details

Expected Behavior

Actual Behavior

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions