worker stops consuming tasks after redis reconnection on celery 5 #7276
-
|
I am experiencing an issue with celery==5.2.3 that I did not experience with celery 4.4.7 which I have recently migrated from. I am using redis (5.0.9) as the message broker. When I manually restart redis, the celery worker from time to time (after the restart of redis) stops consuming tasks indefinitely. Celery beat is able to publish tasks to the broker without any problem after the redis restarts. Once I force a restart of the worker, it will get all the past scheduled tasks by beat. Only if I run celery 5 worker without heartbeat/gossip/mingle this does not happen and I can restart redis without the worker stopping to consume tasks after it reconnects to it. I am running the worker with the following options to "make it work":
When I try running celery with rabbitmq as the message broker and with mingle/gossip/heartbeat I cannot reproduce the bug (this only happens with redis). But for the scenario I am using I need to keep using redis. I have 2 questions:
Logs prior to when it get's stuck. I did wait for half an hour and tasks (periodic task are scheduled every 5 minutes) were not consumed by the worker, then I did hit ctrl+c. There is no logs when it stops consuming messages, it just "freezes": Celery report |
Beta Was this translation helpful? Give feedback.
Replies: 51 comments 86 replies
-
|
Facing this exact issue. |
Beta Was this translation helpful? Give feedback.
-
|
I experience the same issue with The redis service would restart but the celery worker won't consume messages from redis afterwards. |
Beta Was this translation helpful? Give feedback.
-
|
Same issue with: |
Beta Was this translation helpful? Give feedback.
-
|
same issue with: |
Beta Was this translation helpful? Give feedback.
-
|
Same issue with: although the worker continued to consume tasks for ~20 minutes after redis reconnected, then stopped. |
Beta Was this translation helpful? Give feedback.
-
|
Same issue: |
Beta Was this translation helpful? Give feedback.
-
|
I also want to know if any negative impact to run celery without the heartbeat/gossip/mingle enabled? Any idea for this ? thanks ! |
Beta Was this translation helpful? Give feedback.
-
|
Same issue here |
Beta Was this translation helpful? Give feedback.
-
|
Had the same issue. But, after some random period of time those tasks gets consume. Sometimes it takes loooong time (even hours), but they finally start to show up. |
Beta Was this translation helpful? Give feedback.
-
|
We had the same issue, for now, we're "working around it" by using --without-mingle and --without-gossip (I did not use --without-heartbeat) but seems like the problem is resolved. Hopefully we won't run into new issues because we deactivated both features. |
Beta Was this translation helpful? Give feedback.
-
|
Same issue here, using a redis cluster |
Beta Was this translation helpful? Give feedback.
-
|
Same here on |
Beta Was this translation helpful? Give feedback.
-
|
Same issue here with
|
Beta Was this translation helpful? Give feedback.
-
|
Same issue here
|
Beta Was this translation helpful? Give feedback.
-
|
I added a step by step process to reproduce the bug, please let me know if I can provide anything else to help this being resolved. Thanks a lot ! |
Beta Was this translation helpful? Give feedback.
-
|
Celery v5.4.0rc1 is ready for testing! |
Beta Was this translation helpful? Give feedback.
-
|
Same problem with This is my import configparser
config = configparser.ConfigParser()
config.read('config.ini')
CELERY_CONFIG = config['celery']
broker_url = CELERY_CONFIG['broker']
result_backend = CELERY_CONFIG['backend']
worker_cancel_long_running_tasks_on_connection_loss = True
broker_connection_retry_on_startup = True
# FIXME: work-around for sudden connection drop on Redis.
# Track this problem on:
# - https://groups.google.com/g/celery-users/c/6yF34oA30Ys
# - https://github.com/celery/celery/discussions/7276
broker_connection_max_retries = None
broker_pool_limit = None
worker_deduplicate_successful_tasks = True
worker_concurrency = 1
worker_prefetch_multiplier = 1
worker_state_db = "state.db"
worker_send_task_events = True
worker_pool = 'prefork'
task_time_limit = 3600My worker starting command are: Running After my worker stopped consuming task, I tried to It seems like the worker couldn't close the connection and stuck there. I haven't tried the |
Beta Was this translation helpful? Give feedback.
-
|
I am also experiencing this issue. Simple setup: Celery worker, one Redis database as broker, one as results backend, both on the same Redis instance. When restarting Redis, meaning it comes back up within seconds or even less, this is logged: Sometimes, tasks 'work' again, but more often, they don't. Whether they do or don't, seems random. When tasks are received, nothing is logged. Contrary to some other messages in this thread, Celery shuts down gracefully when stopping it. What I've tried: Set
... so it's no surprise it doesn't work, as restarting the single Redis instance causes the broker to become unavailable too. Set keepalive on the socket, according to the configuration at #7276 (reply in thread). That doesn't help either. |
Beta Was this translation helpful? Give feedback.
-
|
hi, We upgraded celery in 5.3 to use new connection parameters -> seems better but still happen sometimes instead of often Do you have a solution to monitor the celery lost connection to Rabbitmq Broker ? Thanks |
Beta Was this translation helpful? Give feedback.
-
Beta Was this translation helpful? Give feedback.
-
|
Worker stops consuming tasks during rabbitmq cluster repave we have added most of the celery configurations Set the connection_timeout in rabbitmq configuration Added HA Policy. Celery - 5.3.6 Thanks :) |
Beta Was this translation helpful? Give feedback.
-
|
There’s a possibility that the bug was introduced in Kombu > v5.2.2, as it has been reported that Kombu v5.2.2 does not reproduce the bug with Celery 5.4.0 (or lower). A possible fix is in the making: celery/kombu#2007 We’ll update when there is more concrete news to share. |
Beta Was this translation helpful? Give feedback.
-
|
Hey everyone, the bug was finally fixed! Kombu v5.4.0rc1 is out with the fix included. It should work on the latest and recent Celery versions. Thank you! Here’s a quick sanity check showing multiple restarts and standard behavior, using Celery v5.4.0 ( celery -A myapp worker -l INFO
[2024-06-22 16:57:15,807: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
-------------- celery@Tomers-MacBook-Pro.local v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- macOS-14.5-arm64-arm-64bit 2024-06-22 16:57:15
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: myapp:0x104de9340
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 10 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. myapp.add
[2024-06-22 16:57:15,966: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2024-06-22 16:57:15,977: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:57:15,977: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-06-22 16:57:15,977: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2024-06-22 16:57:15,981: INFO/MainProcess] mingle: searching for neighbors
[2024-06-22 16:57:15,983: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:57:16,996: INFO/MainProcess] mingle: all alone
[2024-06-22 16:57:17,023: INFO/MainProcess] celery@Tomers-MacBook-Pro.local ready.
[2024-06-22 16:57:23,627: INFO/MainProcess] Task myapp.add[5404a837-eece-499d-9127-54428129d7ab] received
[2024-06-22 16:57:23,632: INFO/ForkPoolWorker-8] Task myapp.add[5404a837-eece-499d-9127-54428129d7ab] succeeded in 0.0020746251102536917s: 3
[2024-06-22 16:57:24,641: INFO/MainProcess] Task myapp.add[c0c17e72-2d1d-4a73-95e9-3a5d92e392ac] received
[2024-06-22 16:57:24,642: INFO/ForkPoolWorker-8] Task myapp.add[c0c17e72-2d1d-4a73-95e9-3a5d92e392ac] succeeded in 0.00014429213479161263s: 3
[2024-06-22 16:57:25,068: INFO/MainProcess] Task myapp.add[af323875-1671-4aa1-b5b5-3301f465607c] received
[2024-06-22 16:57:25,069: INFO/ForkPoolWorker-8] Task myapp.add[af323875-1671-4aa1-b5b5-3301f465607c] succeeded in 0.00010225013829767704s: 3
[2024-06-22 16:57:25,414: INFO/MainProcess] Task myapp.add[f17f2d70-0870-4a2e-9f8a-c1a99603075d] received
[2024-06-22 16:57:25,415: INFO/ForkPoolWorker-8] Task myapp.add[f17f2d70-0870-4a2e-9f8a-c1a99603075d] succeeded in 9.058299474418163e-05s: 3
[2024-06-22 16:57:25,762: INFO/MainProcess] Task myapp.add[b448339a-7ae5-4370-8f0e-7feb65f2cfb6] received
[2024-06-22 16:57:25,764: INFO/ForkPoolWorker-8] Task myapp.add[b448339a-7ae5-4370-8f0e-7feb65f2cfb6] succeeded in 0.00022187503054738045s: 3
[2024-06-22 16:57:26,127: INFO/MainProcess] Task myapp.add[e5354309-0a9b-409b-906d-0ec366c06415] received
[2024-06-22 16:57:26,129: INFO/ForkPoolWorker-8] Task myapp.add[e5354309-0a9b-409b-906d-0ec366c06415] succeeded in 0.00011062505654990673s: 3
[2024-06-22 16:57:28,244: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 340, in start
blueprint.start(self)
File "/Users/nusnus/dev/GitHub/celery/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 746, in start
c.loop(*c.loop_args())
File "/Users/nusnus/dev/GitHub/celery/celery/worker/loops.py", line 97, in asynloop
next(loop)
File "/Users/nusnus/dev/GitHub/kombu/kombu/asynchronous/hub.py", line 373, in create_loop
cb(*cbargs)
File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 1350, in on_readable
self.cycle.on_readable(fileno)
File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 569, in on_readable
chan.handlers[type]()
File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 965, in _brpop_read
dest__item = self.client.parse_response(self.client.connection,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 562, in parse_response
response = connection.read_response()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/connection.py", line 512, in read_response
response = self._parser.read_response(disable_decoding=disable_decoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 15, in read_response
result = self._read_response(disable_decoding=disable_decoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 25, in _read_response
raw = self._buffer.readline()
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 115, in readline
self._read_from_socket()
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 68, in _read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
[2024-06-22 16:57:28,258: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:391: CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered
back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss
setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.
warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)
[2024-06-22 16:57:28,262: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2024-06-22 16:57:28,263: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:57:28,263: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error while reading from 127.0.0.1:6379 : (54, 'Connection reset by peer').
Trying again in 2.00 seconds... (1/100)
[2024-06-22 16:57:30,274: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:57:30,275: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused..
Trying again in 4.00 seconds... (2/100)
[2024-06-22 16:57:34,307: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:57:34,307: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-06-22 16:57:34,308: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2024-06-22 16:57:34,321: INFO/MainProcess] mingle: searching for neighbors
[2024-06-22 16:57:35,329: INFO/MainProcess] mingle: all alone
[2024-06-22 16:57:37,733: INFO/MainProcess] Task myapp.add[a6385ab7-de2c-41f0-afc0-f3debabf18ae] received
[2024-06-22 16:57:37,734: INFO/ForkPoolWorker-8] Task myapp.add[a6385ab7-de2c-41f0-afc0-f3debabf18ae] succeeded in 8.958298712968826e-05s: 3
[2024-06-22 16:57:38,244: INFO/MainProcess] Task myapp.add[1f2d621c-073f-4622-bf0a-0dfb28d2bb90] received
[2024-06-22 16:57:38,245: INFO/ForkPoolWorker-8] Task myapp.add[1f2d621c-073f-4622-bf0a-0dfb28d2bb90] succeeded in 0.0001180002000182867s: 3
[2024-06-22 16:57:38,578: INFO/MainProcess] Task myapp.add[30ffef19-1a40-411a-bf13-eea3bfc99fa9] received
[2024-06-22 16:57:38,582: INFO/ForkPoolWorker-8] Task myapp.add[30ffef19-1a40-411a-bf13-eea3bfc99fa9] succeeded in 0.0001004999503493309s: 3
[2024-06-22 16:57:38,874: INFO/MainProcess] Task myapp.add[ed8bbc9f-76cd-466e-8c63-588c62a80ec5] received
[2024-06-22 16:57:38,876: INFO/ForkPoolWorker-8] Task myapp.add[ed8bbc9f-76cd-466e-8c63-588c62a80ec5] succeeded in 0.00014774990268051624s: 3
[2024-06-22 16:57:39,136: INFO/MainProcess] Task myapp.add[e5010380-eeca-4a76-bb1a-f32a040e3806] received
[2024-06-22 16:57:39,138: INFO/ForkPoolWorker-8] Task myapp.add[e5010380-eeca-4a76-bb1a-f32a040e3806] succeeded in 0.00010820780880749226s: 3
[2024-06-22 16:57:41,376: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 340, in start
blueprint.start(self)
File "/Users/nusnus/dev/GitHub/celery/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 746, in start
c.loop(*c.loop_args())
File "/Users/nusnus/dev/GitHub/celery/celery/worker/loops.py", line 97, in asynloop
next(loop)
File "/Users/nusnus/dev/GitHub/kombu/kombu/asynchronous/hub.py", line 373, in create_loop
cb(*cbargs)
File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 1350, in on_readable
self.cycle.on_readable(fileno)
File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 569, in on_readable
chan.handlers[type]()
File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 965, in _brpop_read
dest__item = self.client.parse_response(self.client.connection,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 562, in parse_response
response = connection.read_response()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/connection.py", line 512, in read_response
response = self._parser.read_response(disable_decoding=disable_decoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 15, in read_response
result = self._read_response(disable_decoding=disable_decoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 25, in _read_response
raw = self._buffer.readline()
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 115, in readline
self._read_from_socket()
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 68, in _read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
[2024-06-22 16:57:41,379: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:391: CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered
back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss
setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.
warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)
[2024-06-22 16:57:41,382: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2024-06-22 16:57:41,385: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:57:41,385: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error while reading from 127.0.0.1:6379 : (54, 'Connection reset by peer').
Trying again in 2.00 seconds... (1/100)
[2024-06-22 16:57:43,394: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:57:43,394: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused..
Trying again in 4.00 seconds... (2/100)
[2024-06-22 16:57:47,411: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:57:47,412: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error 61 connecting to 127.0.0.1:6379. Connection refused..
Trying again in 6.00 seconds... (3/100)
[2024-06-22 16:57:53,441: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:57:53,442: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-06-22 16:57:53,442: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2024-06-22 16:57:53,448: INFO/MainProcess] mingle: searching for neighbors
[2024-06-22 16:57:54,458: INFO/MainProcess] mingle: all alone
[2024-06-22 16:57:56,929: INFO/MainProcess] Task myapp.add[ea1609f7-d3e8-4829-8e7e-bd895ffb6f44] received
[2024-06-22 16:57:56,931: INFO/ForkPoolWorker-8] Task myapp.add[ea1609f7-d3e8-4829-8e7e-bd895ffb6f44] succeeded in 0.00030320906080305576s: 3
[2024-06-22 16:57:57,379: INFO/MainProcess] Task myapp.add[543bc4f5-74b9-432c-ab7b-6d5394b9dcb7] received
[2024-06-22 16:57:57,385: INFO/ForkPoolWorker-8] Task myapp.add[543bc4f5-74b9-432c-ab7b-6d5394b9dcb7] succeeded in 9.933393448591232e-05s: 3
[2024-06-22 16:57:57,734: INFO/MainProcess] Task myapp.add[1a620705-8f5b-4b83-852c-8e89bc701f7d] received
[2024-06-22 16:57:57,736: INFO/ForkPoolWorker-8] Task myapp.add[1a620705-8f5b-4b83-852c-8e89bc701f7d] succeeded in 0.0001194588840007782s: 3
[2024-06-22 16:57:58,039: INFO/MainProcess] Task myapp.add[e7e0f287-c805-4445-bdda-cdd81d7794c4] received
[2024-06-22 16:57:58,042: INFO/ForkPoolWorker-8] Task myapp.add[e7e0f287-c805-4445-bdda-cdd81d7794c4] succeeded in 0.0002472498454153538s: 3
[2024-06-22 16:58:05,481: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 340, in start
blueprint.start(self)
File "/Users/nusnus/dev/GitHub/celery/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py", line 746, in start
c.loop(*c.loop_args())
File "/Users/nusnus/dev/GitHub/celery/celery/worker/loops.py", line 97, in asynloop
next(loop)
File "/Users/nusnus/dev/GitHub/kombu/kombu/asynchronous/hub.py", line 373, in create_loop
cb(*cbargs)
File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 1350, in on_readable
self.cycle.on_readable(fileno)
File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 569, in on_readable
chan.handlers[type]()
File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 916, in _receive
ret.append(self._receive_one(c))
^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/dev/GitHub/kombu/kombu/transport/redis.py", line 926, in _receive_one
response = c.parse_response()
^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 837, in parse_response
response = self._execute(conn, try_read)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 813, in _execute
return conn.retry.call_with_retry(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/retry.py", line 49, in call_with_retry
fail(error)
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 815, in <lambda>
lambda error: self._disconnect_raise_connect(conn, error),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 802, in _disconnect_raise_connect
raise error
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/retry.py", line 46, in call_with_retry
return do()
^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 814, in <lambda>
lambda: command(*args, **kwargs),
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/client.py", line 835, in try_read
return conn.read_response(disconnect_on_error=False, push_request=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/connection.py", line 512, in read_response
response = self._parser.read_response(disable_decoding=disable_decoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 15, in read_response
result = self._read_response(disable_decoding=disable_decoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 25, in _read_response
raw = self._buffer.readline()
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 115, in readline
self._read_from_socket()
File "/Users/nusnus/.pyenv/versions/3.12.1/envs/celery_py312/lib/python3.12/site-packages/redis/_parsers/socket.py", line 68, in _read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
[2024-06-22 16:58:05,485: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:391: CPendingDeprecationWarning:
In Celery 5.1 we introduced an optional breaking change which
on connection loss cancels all currently executed tasks with late acknowledgement enabled.
These tasks cannot be acknowledged as the connection is gone, and the tasks are automatically redelivered
back to the queue. You can enable this behavior using the worker_cancel_long_running_tasks_on_connection_loss
setting. In Celery 5.1 it is set to False by default. The setting will be set to True by default in Celery 6.0.
warnings.warn(CANCEL_TASKS_BY_DEFAULT, CPendingDeprecationWarning)
[2024-06-22 16:58:05,494: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2024-06-22 16:58:05,495: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:58:05,495: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error while reading from 127.0.0.1:6379 : (54, 'Connection reset by peer').
Trying again in 2.00 seconds... (1/100)
[2024-06-22 16:58:07,518: WARNING/MainProcess] No hostname was supplied. Reverting to default 'localhost'
[2024-06-22 16:58:07,518: INFO/MainProcess] Connected to redis://localhost:6379//
[2024-06-22 16:58:07,519: WARNING/MainProcess] /Users/nusnus/dev/GitHub/celery/celery/worker/consumer/consumer.py:508: CPendingDeprecationWarning: The broker_connection_retry configuration setting will no longer determine
whether broker connection retries are made during startup in Celery 6.0 and above.
If you wish to retain the existing behavior for retrying connections on startup,
you should set broker_connection_retry_on_startup to True.
warnings.warn(
[2024-06-22 16:58:07,536: INFO/MainProcess] mingle: searching for neighbors
[2024-06-22 16:58:08,552: INFO/MainProcess] mingle: all alone
[2024-06-22 16:58:15,652: INFO/MainProcess] Task myapp.add[ad5a9cb1-7976-4f02-a9e9-a193b2c3f68e] received
[2024-06-22 16:58:15,653: INFO/ForkPoolWorker-8] Task myapp.add[ad5a9cb1-7976-4f02-a9e9-a193b2c3f68e] succeeded in 8.908403106033802e-05s: 3
[2024-06-22 16:58:16,051: INFO/MainProcess] Task myapp.add[769968b9-4343-4ee5-8e72-0aac92303f94] received
[2024-06-22 16:58:16,053: INFO/ForkPoolWorker-8] Task myapp.add[769968b9-4343-4ee5-8e72-0aac92303f94] succeeded in 0.00015416601672768593s: 3
[2024-06-22 16:58:16,367: INFO/MainProcess] Task myapp.add[b5c49695-ee56-4f2a-bfd8-edb4d4eead6a] received
[2024-06-22 16:58:16,369: INFO/ForkPoolWorker-8] Task myapp.add[b5c49695-ee56-4f2a-bfd8-edb4d4eead6a] succeeded in 0.00014933408237993717s: 3
[2024-06-22 16:58:16,662: INFO/MainProcess] Task myapp.add[7dc9d221-6b11-4ddd-b1b3-05ea257d8e84] received
[2024-06-22 16:58:16,664: INFO/ForkPoolWorker-8] Task myapp.add[7dc9d221-6b11-4ddd-b1b3-05ea257d8e84] succeeded in 8.983304724097252e-05s: 3
^C
worker: Hitting Ctrl+C again will terminate all running tasks!
worker: Warm shutdown (MainProcess) |
Beta Was this translation helpful? Give feedback.
-
|
Celery v5.5.0b1 was released, including a fix for this issue. |
Beta Was this translation helpful? Give feedback.
-
|
Has the issue been resolved for Redis only or does it also apply to RabbitMQ? |
Beta Was this translation helpful? Give feedback.
-
|
I am still experiencing this on |
Beta Was this translation helpful? Give feedback.
-
We’re in the middle of a release cycle for a new version, so I’ll reopen this issue as requested, but it would be helpful to report further cases using the latest pre-release to ensure the bug persists and is not already resolved in the upcoming version 🙏 |
Beta Was this translation helpful? Give feedback.
-
|
Was investigating this on Apache Airflow with celery 5.5.3, one of the reasons it is happening is due to incorrect behavior of Local Docker Compose Environment:
Below statement seems not to be true:
The prefetch count will be gradually restored, not by the tasks that were running before the connection was lost, but by the new tasks on ack/reject, so when there are 16 tasks running on connection loss, we need anoter 16 tasks after connection loss for the prefetch to be restored back _restore_prefetch_count_after_connection_restart will never be called for tasks before connection loss: When there is a connection loss, the pool is flushed , so neither of We can comment out pool flush on connection loss (i've been fixing #7515 by doing that - still happening in 5.5.3): celery/celery/worker/consumer/consumer.py Line 468 in afb6730 Then ack_log_error_promise will be called, but it would need to fail for the prefetch to be restored, with redis it doesn't. So there is another statement that i think is not entirely correct, i.e that on connection loss tasks cannot be acknowledged, at least not with redis, was testing this locally and on connection loss task was acked correctly:
How to replicateScenario 1:
Scenario 2:
|
Beta Was this translation helpful? Give feedback.
-
|
Hi @Nusnus , I encountered this issue but the fix you released in 5.5.0b1 solved it for us a while ago. However, today, I ran into an edge case in Celery 5.5.3 where the worker fails to consume tasks from the scheduler if it got disconnected from Redis before being it can consume any tasks. Reproduction Steps:
Typically, this occurred in our Kube environment when both the Redis pod and worker pod were destroyed simultaneously. If the worker restarts before Redis, after Redis becomes available and the worker reconnects, the consumer doesn't properly resume consuming the tasks. Minimal Reproduction: from celery import Celery, shared_task
from celery.schedules import schedule
from datetime import timedelta
app = Celery("reproduction")
@shared_task
def debug_task():
print("Debug task running")
app.conf.beat_schedule = {
"debug-task-every-five-seconds": {
"task": debug_task.name,
"schedule": schedule(timedelta(seconds=5)),
},
}Steps to reproduce:
Environment:
Is this a known issue? I feel like it could be related to some comments here saying that this issue was not solved by 5.5.0b1 while it has been for most cases. |
Beta Was this translation helpful? Give feedback.
-
|
@ChickenBenny do you think this is related to one of your recent fixes? |
Beta Was this translation helpful? Give feedback.





Yes, the symptoms are exactly the same, but the underlying root causes are slightly different depending on the Celery version.
To break it down:
The original #7276 issue: This was a deep-rooted issue in how Kombu handled Pub/Sub and event loops upon reconnection. As @Nusnus mentioned earlier in this thread, that specific root cause was fixed in Celery v5.5.0b1 / Kombu 5.4.0.
The 5.6.x Regression: Unfortunately, the exact same symptoms (worker freezing, stuck file descriptors, and no longer consuming tasks after Redis failover) were accidentally reintroduced in Celery 5.6.x. This was caused by regressions from two recent PRs (#9986 and #7273) which removed hub.reset() from the error pa…