Skip to content

Worker stops consuming tasks after Redis re-connection on celery 5 #8091

@ashwin-metripping

Description

@ashwin-metripping

Discussed in #7276

Originally posted by fcovatti February 3, 2022
I am experiencing an issue with celery==5.2.3 that I did not experience with celery 4.4.7 which I have recently migrated from.

I am using redis (5.0.9) as the message broker. When I manually restart redis, the celery worker from time to time (after the restart of redis) stops consuming tasks indefinitely. Celery beat is able to publish tasks to the broker without any problem after the redis restarts. Once I force a restart of the worker, it will get all the past scheduled tasks by beat.

Only if I run celery 5 worker without heartbeat/gossip/mingle this does not happen and I can restart redis without the worker stopping to consume tasks after it reconnects to it.

I am running the worker with the following options to "make it work":

celery -A proj worker -l info --without-heartbeat --without-gossip --without-mingle

When I try running celery with rabbitmq as the message broker and with mingle/gossip/heartbeat I cannot reproduce the bug (this only happens with redis). But for the scenario I am using I need to keep using redis.

I have 2 questions:

  1. Is it okay to run celery without the heartbeat/gossip/mingle enabled?
  2. Is this somehow a bug I should report?

Logs prior to when it get's stuck. I did wait for half an hour and tasks (periodic task are scheduled every 5 minutes) were not consumed by the worker, then I did hit ctrl+c. There is no logs when it stops consuming messages, it just "freezes":

[2022-02-01 22:50:13,323: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 326, in start
    blueprint.start(self)
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 618, in start
    c.loop(*c.loop_args())
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/celery/worker/loops.py", line 97, in asynloop
    next(loop)
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/kombu/transport/redis.py", line 1266, in on_readable
    self.cycle.on_readable(fileno)
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/kombu/transport/redis.py", line 504, in on_readable
    chan.handlers[type]()
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/kombu/transport/redis.py", line 896, in _brpop_read
    dest__item = self.client.parse_response(self.client.connection,
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/redis/client.py", line 1192, in parse_response
    response = connection.read_response()
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/redis/connection.py", line 814, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/redis/connection.py", line 320, in read_response
    raw = self._buffer.readline()
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/redis/connection.py", line 251, in readline
    self._read_from_socket()
  File "/home/covatti/.local/share/virtualenvs/souq-p_Vlgosd/lib/python3.8/site-packages/redis/connection.py", line 197, in _read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
[2022-02-01 22:50:13,327: DEBUG/MainProcess] | Consumer: Restarting event loop...
[2022-02-01 22:50:13,327: DEBUG/MainProcess] | Consumer: Restarting Control...
[2022-02-01 22:50:13,328: DEBUG/MainProcess] | Consumer: Restarting Tasks...
[2022-02-01 22:50:13,328: DEBUG/MainProcess] Canceling task consumer...
[2022-02-01 22:50:13,328: DEBUG/MainProcess] | Consumer: Restarting Heart...
[2022-02-01 22:50:13,330: DEBUG/MainProcess] | Consumer: Restarting DriverConsumerStep...
[2022-02-01 22:50:13,330: DEBUG/MainProcess] | Consumer: Restarting Mingle...
[2022-02-01 22:50:13,330: DEBUG/MainProcess] | Consumer: Restarting Events...
[2022-02-01 22:50:13,330: DEBUG/MainProcess] | Consumer: Restarting Connection...
[2022-02-01 22:50:13,331: DEBUG/MainProcess] | Consumer: Starting Connection
[2022-02-01 22:50:13,333: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Connection closed by server..
Trying again in 2.00 seconds... (1/100)

[2022-02-01 22:50:15,337: ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379//: Error 111 connecting to localhost:6379. Connection refused..
Trying again in 4.00 seconds... (2/100)

[2022-02-01 22:50:19,345: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-02-01 22:50:19,346: DEBUG/MainProcess] ^-- substep ok
[2022-02-01 22:50:19,346: DEBUG/MainProcess] | Consumer: Starting Events
[2022-02-01 22:50:19,350: DEBUG/MainProcess] ^-- substep ok
[2022-02-01 22:50:19,351: DEBUG/MainProcess] | Consumer: Starting Mingle
[2022-02-01 22:50:19,351: INFO/MainProcess] mingle: searching for neighbors
[2022-02-01 22:50:20,357: INFO/MainProcess] mingle: all alone
[2022-02-01 22:50:20,357: DEBUG/MainProcess] ^-- substep ok
[2022-02-01 22:50:20,357: DEBUG/MainProcess] | Consumer: Starting DriverConsumerStep
[2022-02-01 22:50:20,361: DEBUG/MainProcess] ^-- substep ok
[2022-02-01 22:50:20,361: DEBUG/MainProcess] | Consumer: Starting Heart
[2022-02-01 22:50:20,363: DEBUG/MainProcess] ^-- substep ok
[2022-02-01 22:50:20,363: DEBUG/MainProcess] | Consumer: Starting Tasks
[2022-02-01 22:50:20,368: DEBUG/MainProcess] ^-- substep ok
[2022-02-01 22:50:20,368: DEBUG/MainProcess] | Consumer: Starting Control
[2022-02-01 22:50:20,372: DEBUG/MainProcess] ^-- substep ok
[2022-02-01 22:50:20,373: DEBUG/MainProcess] | Consumer: Starting event loop
[2022-02-01 22:50:20,373: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2022-02-01 22:50:20,373: DEBUG/MainProcess] basic.qos: prefetch_count->16
^C

Celery report

(proj) user@desktop:~/app$ celery -A proj report

software -> celery:5.2.3 (dawn-chorus) kombu:5.2.3 py:3.8.0
            billiard:3.6.4.0 redis:4.1.2
platform -> system:Linux arch:64bit, ELF
            kernel version:5.4.0-97-generic imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:redis results:redis://localhost:6379/

ABSOLUTE_URL_OVERRIDES: {
 }
ADMINS: []
ADMIN_PERMISSION_CLASS: <class 'rest_framework_jwt.permissions.create_custom_permission_class.<locals>.CustomPermission'>
ALLOWED_HOSTS: '*'
APPEND_SLASH: True
AUTHENTICATION_BACKENDS: ['django.contrib.auth.backends.ModelBackend']
AUTH_PASSWORD_VALIDATORS: '********'
AUTH_USER_MODEL: 'auth.User'
CACHES: {
 'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}}
CACHE_MIDDLEWARE_ALIAS: 'default'
CACHE_MIDDLEWARE_KEY_PREFIX: '********'
CACHE_MIDDLEWARE_SECONDS: 600
CELERY_ACCEPT_CONTENT: ['application/json']
CELERY_BROKER_URL: 'redis://localhost:6379//'
CELERY_RESULT_BACKEND: 'redis://localhost:6379/'
CELERY_RESULT_SERIALIZER: 'json'
CELERY_TASK_SERIALIZER: 'json'
CSRF_COOKIE_AGE: 31449600
CSRF_COOKIE_DOMAIN: None
CSRF_COOKIE_HTTPONLY: False
CSRF_COOKIE_NAME: 'csrftoken'
CSRF_COOKIE_PATH: '/'
CSRF_COOKIE_SAMESITE: 'Lax'
CSRF_COOKIE_SECURE: False
CSRF_FAILURE_VIEW: 'django.views.csrf.csrf_failure'
CSRF_HEADER_NAME: 'HTTP_X_CSRFTOKEN'
CSRF_TRUSTED_ORIGINS: []
CSRF_USE_SESSIONS: False
DATABASES: {
    'default': {   'ATOMIC_REQUESTS': False,
                   'AUTOCOMMIT': True,
                   'CONN_MAX_AGE': 0,
                   'ENGINE': 'django.db.backends.postgresql',
                   'HOST': 'localhost',
                   'NAME': 'proj',
                   'OPTIONS': {},
                   'PASSWORD': '********',
                   'PORT': '5432',
                   'TEST': {   'CHARSET': None,
                               'COLLATION': None,
                               'MIGRATE': True,
                               'MIRROR': None,
                               'NAME': None},
                   'TIME_ZONE': None,
                   'USER': 'proj'}}
DATABASE_ROUTERS: '********'
DATA_UPLOAD_MAX_MEMORY_SIZE: 2621440
DATA_UPLOAD_MAX_NUMBER_FIELDS: 1000
DATETIME_FORMAT: 'N j, Y, P'
DATETIME_INPUT_FORMATS: ['%Y-%m-%d %H:%M:%S',
 '%Y-%m-%d %H:%M:%S.%f',
 '%Y-%m-%d %H:%M',
 '%m/%d/%Y %H:%M:%S',
 '%m/%d/%Y %H:%M:%S.%f',
 '%m/%d/%Y %H:%M',
 '%m/%d/%y %H:%M:%S',
 '%m/%d/%y %H:%M:%S.%f',
 '%m/%d/%y %H:%M']
DATE_FORMAT: 'N j, Y'
DATE_INPUT_FORMATS: ['%Y-%m-%d',
 '%m/%d/%Y',
 '%m/%d/%y',
 '%b %d %Y',
 '%b %d, %Y',
 '%d %b %Y',
 '%d %b, %Y',
 '%B %d %Y',
 '%B %d, %Y',
 '%d %B %Y',
 '%d %B, %Y']
DEBUG: True
DEBUG_PROPAGATE_EXCEPTIONS: False
DECIMAL_SEPARATOR: '.'
DEFAULT_AUTO_FIELD: 'django.db.models.AutoField'
DEFAULT_CHARSET: 'utf-8'
DEFAULT_EXCEPTION_REPORTER: 'django.views.debug.ExceptionReporter'
DEFAULT_EXCEPTION_REPORTER_FILTER: 'django.views.debug.SafeExceptionReporterFilter'
DEFAULT_FILE_STORAGE: 'django.core.files.storage.FileSystemStorage'
DEFAULT_FROM_EMAIL: 'webmaster@localhost'
DEFAULT_HASHING_ALGORITHM: 'sha256'
DEFAULT_INDEX_TABLESPACE: ''
DEFAULT_TABLESPACE: ''
DISALLOWED_USER_AGENTS: []
FILE_UPLOAD_DIRECTORY_PERMISSIONS: None
FILE_UPLOAD_HANDLERS: ['django.core.files.uploadhandler.MemoryFileUploadHandler',
 'django.core.files.uploadhandler.TemporaryFileUploadHandler']
FILE_UPLOAD_MAX_MEMORY_SIZE: 2621440
FILE_UPLOAD_PERMISSIONS: 420
FILE_UPLOAD_TEMP_DIR: None
FIRST_DAY_OF_WEEK: 0
FIXTURE_DIRS: []
FORCE_SCRIPT_NAME: None
FORMAT_MODULE_PATH: None
FORM_RENDERER: 'django.forms.renderers.DjangoTemplates'
GRAPH: 'True'
HTTPS_ON: 'False'
IGNORABLE_404_URLS: []
INSTALLED_APPS: ['baton',
 'django.contrib.admin',
 'django.contrib.auth',
 'django.contrib.contenttypes',
 'django.contrib.sessions',
 'django.contrib.messages',
 'django.contrib.staticfiles',
 'django.contrib.sites',
 'django_celery_beat',
 'rest_framework',
 'proj',
 'container_handler',
 'debug_toolbar',
 'nested_admin',
 'django_extensions',
 'baton.autodiscover']
INTERNAL_IPS: ['127.0.0.1']
LOCALE_PATHS: []
LOGGING: {
    'disable_existing_loggers': False,
    'filters': {   'require_debug_false': {   '()': 'django.utils.log.RequireDebugFalse'},
                   'require_debug_true': {   '()': 'django.utils.log.RequireDebugTrue'}},
    'handlers': {   'console': {   'class': 'logging.StreamHandler',
                                   'filters': ['require_debug_true'],
                                   'level': 'DEBUG',
                                   'stream': <_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>},
                    'console_on_not_debug': {   'class': 'logging.StreamHandler',
                                                'filters': [   'require_debug_false'],
                                                'level': 'WARNING'}},
    'loggers': {   'django': {   'handlers': [   'console',
                                                 'console_on_not_debug'],
                                 'level': 'INFO',
                                 'propagate': True},
                 },
    'version': 1}
LOGGING_CONFIG: 'logging.config.dictConfig'
LOGIN_REDIRECT_URL: '/accounts/profile/'
LOGIN_URL: '/accounts/login/'
LOGOUT_REDIRECT_URL: None
MANAGERS: []
MEDIA_ROOT: '/home/proj/app/csv'
MEDIA_URL: '/csv/'
MESSAGE_STORAGE: 'django.contrib.messages.storage.fallback.FallbackStorage'
MIDDLEWARE: ['django.middleware.security.SecurityMiddleware',
 'django.contrib.sessions.middleware.SessionMiddleware',
 'django.middleware.common.CommonMiddleware',
 'django.middleware.csrf.CsrfViewMiddleware',
 'django.contrib.auth.middleware.AuthenticationMiddleware',
 'django.contrib.messages.middleware.MessageMiddleware',
 'django.middleware.clickjacking.XFrameOptionsMiddleware',
 'debug_toolbar.middleware.DebugToolbarMiddleware']
MIGRATION_MODULES: {
 }
MOCK_DATA_GENERATION: {
    'CONFIG_FILE': None,
    'DIG_INDEX': '0',
    'DIG_INDEX_FILE': None,
    'ENABLED': False,
    'MODULE_ENABLED': False,
    'PARAM_LIMITS': '../mock-data-gen-params.csv',
    'SCHEDULE_CRON': '*/5',
    'SCHEDULE_SECONDS': None,
    'STEP_MULTIPLIER_MAX': 15}
MONTH_DAY_FORMAT: 'F j'
NUMBER_GROUPING: 0
PASSWORD_HASHERS: '********'
PASSWORD_RESET_TIMEOUT: '********'
PASSWORD_RESET_TIMEOUT_DAYS: '********'
PREPEND_WWW: False
READ_ADMIN_PERMISSION_CLASS: <class 'rest_framework_jwt.permissions.create_custom_permission_class.<locals>.CustomPermission'>
RECORDS_MAX_DISPLAYED: 999999
REST_FRAMEWORK: {
    'DEFAULT_AUTHENTICATION_CLASSES': (   'rest_framework_jwt.authentication.JSONWebTokenAuthentication',
                                          'rest_framework.authentication.SessionAuthentication',
                                          'rest_framework.authentication.BasicAuthentication'),
    'DEFAULT_PERMISSION_CLASSES': (   'rest_framework.permissions.IsAuthenticated',)}
ROOT_URLCONF: 'proj.urls'
SECURE_BROWSER_XSS_FILTER: False
SECURE_CONTENT_TYPE_NOSNIFF: True
SECURE_HSTS_INCLUDE_SUBDOMAINS: False
SECURE_HSTS_PRELOAD: False
SECURE_HSTS_SECONDS: 0
SECURE_PROXY_SSL_HEADER: None
SECURE_REDIRECT_EXEMPT: []
SECURE_REFERRER_POLICY: 'same-origin'
SECURE_SSL_HOST: None
SECURE_SSL_REDIRECT: False
SERVER_EMAIL: 'root@localhost'
SESSION_CACHE_ALIAS: 'default'
SESSION_COOKIE_AGE: 1209600
SESSION_COOKIE_DOMAIN: None
SESSION_COOKIE_HTTPONLY: True
SESSION_COOKIE_NAME: 'sessionid'
SESSION_COOKIE_PATH: '/'
SESSION_COOKIE_SAMESITE: 'Lax'
SESSION_COOKIE_SECURE: False
SESSION_ENGINE: 'django.contrib.sessions.backends.db'
SESSION_EXPIRE_AT_BROWSER_CLOSE: False
SESSION_FILE_PATH: None
SESSION_SAVE_EVERY_REQUEST: False
SESSION_SERIALIZER: 'django.contrib.sessions.serializers.JSONSerializer'
SETTINGS_MODULE: 'proj.settings'
SHORT_DATETIME_FORMAT: 'm/d/Y P'
SHORT_DATE_FORMAT: 'm/d/Y'
SIGNING_BACKEND: 'django.core.signing.TimestampSigner'
SILENCED_SYSTEM_CHECKS: []
SIMSUITE_INTEGRATION: 'False'
STATICFILES_DIRS: []
STATICFILES_FINDERS: ['django.contrib.staticfiles.finders.FileSystemFinder',
 'django.contrib.staticfiles.finders.AppDirectoriesFinder']
STATICFILES_STORAGE: 'django.contrib.staticfiles.storage.StaticFilesStorage'
STATIC_ROOT: '/home/proj/app/static'
STATIC_URL: '/static/'
TEMPLATES: [{'APP_DIRS': True,
  'BACKEND': 'django.template.backends.django.DjangoTemplates',
  'DIRS': ['/home/proj/app/templates'],
  'OPTIONS': {'context_processors': ['django.template.context_processors.debug',
                                     'django.template.context_processors.request',
                                     'django.contrib.auth.context_processors.auth',
                                     'django.contrib.messages.context_processors.messages']}}]
TEST_NON_SERIALIZED_APPS: []
TEST_RUNNER: 'django.test.runner.DiscoverRunner'
THOUSAND_SEPARATOR: ','
TIME_FORMAT: 'P'
TIME_INPUT_FORMATS: ['%H:%M:%S', '%H:%M:%S.%f', '%H:%M']
TIME_ZONE: 'UTC'
USE_I18N: True
USE_L10N: True
USE_THOUSAND_SEPARATOR: False
USE_TZ: True
USE_X_FORWARDED_HOST: False
USE_X_FORWARDED_PORT: False
WSGI_APPLICATION: 'proj.wsgi.APPLICATION'
X_FRAME_OPTIONS: 'DENY'
YEAR_MONTH_FORMAT: 'F Y'
is_overridden: <bound method Settings.is_overridden of <Settings "proj.settings">>
deprecated_settings: None
beat_schedule: {
    'read_data': {   'schedule': <crontab: */5 * * * * (m/h/d/dM/MY)>,
                           'task': 'data.tasks.read_data'}}
timezone: 'UTC'
```</div>

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions