Skip to content

Commit e56bf5b

Browse files
authored
fix: remove draining from Flush callback (#7420)
* fix: remove draining from Flush callback * test: add test for query while flushall * improve testing with workers * improve test
1 parent 99f7302 commit e56bf5b

2 files changed

Lines changed: 143 additions & 1 deletion

File tree

src/spec.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3543,7 +3543,6 @@ static void onFlush(RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent
35433543
return;
35443544
}
35453545
Indexes_Free(specDict_g);
3546-
workersThreadPool_Drain(ctx, 0);
35473546
Dictionary_Clear();
35483547
RSGlobalStats.totalStats.used_dialects = 0;
35493548
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import threading
2+
import time
3+
from common import *
4+
5+
def test_query_while_flush():
6+
"""
7+
Test scenario:
8+
1. Create index1 with 100 documents
9+
2. Start threads that continuously query the index
10+
3. Call FLUSHALL command
11+
4. Create index2 and start querying it
12+
5. Verify that:
13+
- Before FLUSHALL: Errors=0, Successes>0
14+
- After FLUSHALL: Errors>0, Successes==0 (for old queries)
15+
- New index queries work properly
16+
"""
17+
env = Env(moduleArgs='WORKERS 2')
18+
env.expect('FT.CREATE', 'index1', 'ON', 'HASH', 'SCHEMA', 'text', 'TEXT').ok()
19+
20+
# Add 100 documents to index1
21+
for i in range(100):
22+
env.getClusterConnectionIfNeeded().execute_command('HSET', f'doc:{i}', 'text', f'hello world document {i}')
23+
24+
# Wait for indexing to complete
25+
waitForIndex(env, 'index1')
26+
27+
# Verify index1 is working
28+
result = env.cmd('FT.SEARCH', 'index1', 'hello')
29+
env.assertEqual(result[0], 100) # Should find all 100 documents
30+
31+
# Replace the flushall_called flag with a threading event
32+
flushall_called = threading.Event()
33+
34+
# Statistics tracking
35+
stats = {
36+
'before_flush_errors': 0,
37+
'before_flush_successes': 0,
38+
'after_flush_errors': 0,
39+
'after_flush_successes': 0,
40+
'stop_queries': False,
41+
'flush_completed': False
42+
}
43+
44+
def query_worker(stats):
45+
"""Worker thread that continuously queries the index"""
46+
local_conn = env.getClusterConnectionIfNeeded()
47+
48+
while not stats['stop_queries']:
49+
try:
50+
# Query index1
51+
local_conn.execute_command('FT.SEARCH', 'index1', 'hello')
52+
53+
if not flushall_called.is_set():
54+
# Check if flush has completed
55+
if not stats['flush_completed']:
56+
stats['before_flush_successes'] += 1
57+
else:
58+
stats['after_flush_successes'] += 1
59+
60+
except Exception as e:
61+
# Check if flush has completed
62+
if not flushall_called.is_set():
63+
if not stats['flush_completed']:
64+
stats['before_flush_errors'] += 1
65+
else:
66+
stats['after_flush_errors'] += 1
67+
68+
# Small delay to avoid overwhelming the system
69+
time.sleep(0.001)
70+
71+
# Start 5 query threads (pass the event)
72+
num_threads = 5
73+
threads = []
74+
for i in range(num_threads):
75+
thread = threading.Thread(
76+
target=query_worker,
77+
args=(stats, ),
78+
daemon=True
79+
)
80+
threads.append(thread)
81+
thread.start()
82+
83+
# Let queries run for a bit to accumulate some successes
84+
time.sleep(0.5)
85+
86+
# Signal that flushall is about to be called
87+
flushall_called.set()
88+
# Sleep to guarantee synchronization (if a thread sent between the set and its check, we want to minimize risk)
89+
# The alternative is to use a lock, but in Python there is no native read-write lock, and therefore would be hard to accumulate queries in the workers queue
90+
# so this is a simpler better approach
91+
time.sleep(0.5)
92+
env.assertGreater(stats['before_flush_successes'], 0)
93+
env.assertEqual(stats['before_flush_errors'], 0)
94+
95+
# Execute FLUSHALL
96+
env.cmd('FLUSHALL')
97+
98+
# Mark flush as completed
99+
stats['flush_completed'] = True
100+
# Sleep to guarantee synchronization (if a thread sent between the set and its check, we want to minimize risk)
101+
# The alternative is to use a lock, but in Python there is no native read-write lock, and therefore would be hard to accumulate queries in the workers queue
102+
# so this is a simpler better approach
103+
# Otherwise I could see successes attributed to before flush that should have been after
104+
time.sleep(0.5)
105+
flushall_called.clear() # Reset the event
106+
print(f'Is flag set? {flushall_called.is_set()}')
107+
# Create index2 and verify it works properly
108+
env.expect('FT.CREATE', 'index2', 'ON', 'HASH', 'SCHEMA', 'text', 'TEXT').ok()
109+
110+
# Add some documents to index2
111+
for i in range(10):
112+
env.getClusterConnectionIfNeeded().execute_command('HSET', f'newdoc:{i}', 'text', f'new document {i}')
113+
114+
# Wait for indexing to complete
115+
waitForIndex(env, 'index2')
116+
117+
# Verify index2 works properly
118+
result = env.cmd('FT.SEARCH', 'index2', 'new')
119+
env.assertEqual(result[0], 10) # Should find all 10 new documents
120+
121+
# Stop query threads
122+
stats['stop_queries'] = True
123+
124+
# Wait for all threads to complete
125+
for thread in threads:
126+
thread.join(timeout=2.0)
127+
128+
# Verify statistics before flush
129+
env.assertEqual(stats['before_flush_errors'], 0,
130+
message="Should have no errors before FLUSHALL")
131+
env.assertGreater(stats['before_flush_successes'], 0,
132+
message="Should have successes before FLUSHALL")
133+
134+
# Verify statistics after flush
135+
env.assertGreater(stats['after_flush_errors'], 0,
136+
message="Should have errors after FLUSHALL")
137+
env.assertEqual(stats['after_flush_successes'], 0,
138+
message="Should have no successes after FLUSHALL for old index")
139+
140+
# Verify old index1 is gone
141+
env.expect('FT.SEARCH', 'index1', 'hello').error().contains('No such index')
142+
env.debugPrint(f" Before FLUSHALL - Errors: {stats['before_flush_errors']}, Successes: {stats['before_flush_successes']}")
143+
env.debugPrint(f" After FLUSHALL - Errors: {stats['after_flush_errors']}, Successes: {stats['after_flush_successes']}")

0 commit comments

Comments
 (0)