Skip to content

Memory leak from writing in a loop to a broken rabbitmq heartbeat connection #5047

@mgeens

Description

@mgeens

Checklist

Verified in both the released 4.2.1 and the master (which is reported as 4.2.0).

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
software -> celery:4.2.1 (windowlicker) kombu:4.2.1 py:3.6.5
            billiard:3.5.0.4 py-amqp:2.3.2
platform -> system:Linux arch:64bit, ELF imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:disabled

broker_url: 'amqp://guest:********@172.17.0.1:5672//'
  • I have verified that the issue exists against the master branch of Celery.

Version: https://github.com/celery/celery/tree/ed17e12b14051bac51b84593cf7c8cf21c77e010

software -> celery:4.2.0 (windowlicker) kombu:4.2.1 py:3.6.5
            billiard:3.5.0.4 py-amqp:2.3.2
platform -> system:Linux arch:64bit, ELF imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:disabled
broker_url: 'amqp://guest:********@172.17.0.1:5672//'

Description

The celery workers of several different applications I work on would sporadically start leaking memory in a continuous fashion, until the machine ran out of memory. We initially tried to work around that by making use of the flag that automatically restart the workers after a certain number of tasks was completed. However, the leak turned out to be in the main worker process that manages workers, so this did not help.

I've found 3 issues that together cause this to happen. One of them clearly seems to be a celery/kombu issue, which is what this report is about. For the other two it's not clear yet but I'll include them for context and reproducibility.

The first issue is that a high CPU (or maybe IO) load can cause Celery workers to lose connections. There's been mention of this in #3377 and #3932 . I've been able to reproduce that by using the stress tool with cpu and io flags enabled. I see celery worker connection resets and missed heartbeats appearing in the logs when I do that. In particular I see the connection resets in the worker log for application A when restarting the worker for application B, while under that load. While I saw similar log messages right before memory leaking started to happen, the leaking didn't happen consistently. I believe this is because the connections that are killed are somewhat random, and some of them can recover from that while others don't.

The second issue, for which the reproduction steps are listed below, is that if the celery worker main process' connection used for sending heartbeats to the rabbitmq broker is broken, it will stay broken. In an strace you can see it repeatedly getting SIGPIPE / broken pipe errors. The process keeps 1-2 other connections open to the broker, one of them consistently (I think for receiving the status of other workers and maybe incoming jobs) another intermittent. If I kill the consistent one, another one is immediately recreated. I've also tried swapping out rabbitmq for redis as the broker. When the same connections are killed in that case, they are also recreated right after. I've only found the rabbitmq connection that sends heartbeats to be affected. If the --without-heartbeat flag is used, the connection is still opened, but there's no longer a heartbeat being sent over it. #3921, #3773, #3377 and #3932 appear to be related.

The third issue is that when the repeated writing attempts to the broken connection happen, the main worker process starts leaking memory. I'm not sure yet if this is at the Python object level or something lower.

Potential workarounds appear to be to use a different broker or the --without-heartbeat flag (although I'm not sure if the connection that still gets opened in that case is also used for other things).

Steps to reproduce

Assuming a Ubuntu 18.04 system, you'll need to have the strace and dsniff (for tcpkill) packages installed.

  1. Create a simple tasks.py that uses rabbitmq as the broker. Set the appropriate credentials for your broker. Make sure the ip address used is an external interface, not 127.0.0.1, because we want an internet socket to be used, not a unix one.
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest:guest@172.17.0.1//')


@app.task
def add(x, y):
    return x + y
  1. Run python -m celery worker -A tasks

  2. Run sudo ss -a -p | grep amq. Assuming your celery test worker is the only thing connected to rabbitmq, you should see something like:

tcp              ESTAB                  0                   0                                                                                        172.17.0.1:57522                                               172.17.0.1:amqp              users:(("python",pid=6051,fd=46))                                              
tcp              ESTAB                  0                   0                                                                                        172.17.0.1:57518                                               172.17.0.1:amqp              users:(("python",pid=6051,fd=44))                                              
tcp              ESTAB                  0                   0                                                                                        172.17.0.1:57520                                               172.17.0.1:amqp              users:(("python",pid=6051,fd=45))                                              
tcp              LISTEN                 0                   128                                                                                               *:amqp                                                         *:*                 users:(("beam.smp",pid=1741,fd=56))                                            
tcp              ESTAB                  0                   0                                                                               [::ffff:172.17.0.1]:amqp                                     [::ffff:192.168.1.59]:57518             users:(("beam.smp",pid=1741,fd=14))                                            
tcp              ESTAB                  0                   0                                                                               [::ffff:172.17.0.1]:amqp                                     [::ffff:192.168.1.59]:57520             users:(("beam.smp",pid=1741,fd=15))                                            
tcp              ESTAB                  0                   0                                                                               [::ffff:172.17.0.1]:amqp                                     [::ffff:192.168.1.59]:57522             users:(("beam.smp",pid=1741,fd=16))               

Your worker will be the python process. If you have more processes running you'll need to narrow it down to the pid of the main worker process using ps aux or similar. There will be 2-3 file descriptors that the process has open towards rabbitmq, which increase linearly from some starting value. We're interested in the second one, fd=45 in this example. I'll refer to the pid of the main worker process as $PID in subsequent commands. Also make note of the port associated with that file descriptor, 57520 in this example. I'll refer to it as $PORT subsequently.

  1. Run sudo strace -tt -f -s 1000 -p $PID -e trace=sendto. You'll see celery sending heartbeat messages to the mentioned second file descriptor (45 in this example).
10:40:20.011043 sendto(45, "\1\0\1\0\0\0!\0<\0(\0\0\10celeryev\20worker.heartbeat\0\316\2\0\1\0\0\0H\0<\0\0\0\0\0\0\0\0\1\20\370\0\20application/json\5utf-8\0\0\0\35\10hostnameS\0\0\0\17celery@amaranth\1\0\316\3\0\1\0\0\1\20{\"hostname\": \"celery@amaranth\", \"utcoffset\": -2, \"pid\": 6051, \"clock\": 481, \"freq\": 2.0, \"active\": 0, \"processed\": 0, \"loadavg\": [0.56, 0.68, 0.61], \"sw_ident\": \"py-celery\", \"sw_ver\": \"4.2.1\", \"sw_sys\": \"Linux\", \"timestamp\": 1536741620.0103393, \"type\": \"worker-heartbeat\"}\316", 401, 0, NULL, 0) = 401
  1. Now we're going to use tcpkill to simulate the issue where celery workers can lose connections under high load. Run sudo tcpkill -i lo host 172.17.0.1 and port $PORT, substituting the network interface passed to -i and the ip address with the ones you're using. tcpkill will run continuously, but you can close it after it's printed its initial output. You can verify with ss that the connection is gone.

  2. Run sudo strace -tt -f -s 1000 -p $PID -e trace=sendto again. There will be repeated SIGPIPE errors.

11:14:53.978952 sendto(45, "\1\0\1\0\0\0!\0<\0(\0\0\10celeryev\20worker.heartbeat\0\316\2\0\1\0\0\0H\0<\0\0\0\0\0\0\0\0\1\17\370\0\20application/json\5utf-8\0\0\0\35\10hostnameS\0\0\0\17celery@amaranth\1\0\316\3\0\1\0\0\1\17{\"hostname\": \"celery@amaranth\", \"utcoffset\": -2, \"pid\": 6051, \"clock\": 2476, \"freq\": 2.0, \"active\": 0, \"processed\": 0, \"loadavg\": [0.79, 0.7, 0.73], \"sw_ident\": \"py-celery\", \"sw_ver\": \"4.2.1\", \"sw_sys\": \"Linux\", \"timestamp\": 1536743693.978148, \"type\": \"worker-heartbeat\"}\316", 400, 0, NULL, 0) = -1 EPIPE (Broken pipe)
11:14:53.979275 --- SIGPIPE {si_signo=SIGPIPE, si_code=SI_USER, si_pid=6051, si_uid=1000} ---
  1. Monitor the resident memory size of the main worker process over a longer time period, say half an hour. For example using pidstat -r -p $PID 1. In my case the memory size increases and keeps increasing by about 20 megabytes per hour.

Expected behavior

  1. The connection reappears after killing it
  2. There are no SIGPIPE errors
  3. Ideally no leaking, but that might not be because of Celery

Example log message when issue 1 (losing connections under load) happens.

This is the log from the worker for app1. app2 is a different application that has its own workers.

[2018-08-30 09:19:02,621: INFO/MainProcess] [2018-08-30 09:19:02Z] missed heartbeat from app2@worker-1
[2018-08-30 09:19:08,588: INFO/MainProcess] [2018-08-30 09:19:08Z] sync with app2@worker-2
[2018-08-30 09:19:08,589: ERROR/MainProcess] [2018-08-30 09:19:08Z] Control command error: ConnectionResetError(104, 'Connection reset by peer')
[2018-08-30 09:19:10,349: INFO/MainProcess] [2018-08-30 09:19:10Z] sync with app2@worker-1

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions