-
Notifications
You must be signed in to change notification settings - Fork 24.4k
Closed
Description
Describe the bug
The code below causes redis 7.2 to get completely stuck (100% CPU, impossible to connect to the server, have to use kill -9). nb_subscribers = 2 will work fine.
The same code works as expected with redis 7.0.
To reproduce
subscriber.py
#!/usr/bin/env python3
import time
from multiprocessing import Process
from redis import Redis
nb_subscribers = 3
def subscriber(user_id):
r = Redis(unix_socket_path='cache.sock')
try:
r.xgroup_create(name='tasks_queue', groupname='test', mkstream=True)
except Exception:
print('group already exists')
while True:
new_stream = r.xreadgroup(
groupname='test', consumername=f'testuser-{user_id}', streams={'tasks_queue': '>'},
block=2000, count=1)
if not new_stream:
time.sleep(5)
continue
print(new_stream)
processes = []
for i in range(nb_subscribers):
p = Process(target=subscriber, args=(i,))
p.start()
processes.append(p)
while processes:
new_p = []
for p in processes:
if p.is_alive():
new_p.append(p)
processes = new_p
time.sleep(5)
print('all processes dead')feeder.py
#!/usr/bin/env python3
import time
import uuid
from multiprocessing import Process
from redis import Redis
nb_feeders = 1
def feeder():
r = Redis(unix_socket_path='cache.sock')
while True:
fields = {'task_uuid': str(uuid.uuid4())}
r.xadd(name='tasks_queue', fields=fields, id='*', maxlen=5000)
time.sleep(.1)
processes = []
for _ in range(nb_feeders):
p = Process(target=feeder)
p.start()
processes.append(p)
while processes:
new_p = []
for p in processes:
if p.is_alive():
new_p.append(p)
processes = new_p
time.sleep(5)
print('all processes dead')Metadata
Metadata
Assignees
Labels
No labels