Skip to content

Commit b38f839

Browse files
authored
[2.10] [MOD-11011] Fix deadlock while RDB loading and RM_Yield (#6763) (#6785)
* [MOD-11011] Fix deadlock while RDB loading and RM_Yield (#6763) * add test file trying to reproduce lock * test RDB update * try the test changes * only keep test changes * remove some logs * fix: Yield to Redis before acquiring Write Lock * proper Lock initialization on RDB load * proper initialization * remove test file * remove comment from code * add numOps instead of reducing Yield frequency * move lock init * add DEBUG command to improve testing * fix failing tests * remove added tests by mistake (cherry picked from commit 4710f44) * fix compilation * do not print index name * add missing func * fix pytest * fix: remove not-needed pytest file * remove invalid option
1 parent 1e4a7a1 commit b38f839

8 files changed

Lines changed: 189 additions & 21 deletions

File tree

src/debug_commands.c

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,6 +1595,36 @@ DEBUG_COMMAND(bgScanController) {
15951595

15961596
}
15971597

1598+
// Global variable for sleep time before yielding (in microseconds)
1599+
static unsigned int g_indexerSleepBeforeYieldMicros = 0;
1600+
1601+
unsigned int GetIndexerSleepBeforeYieldMicros(void) {
1602+
return g_indexerSleepBeforeYieldMicros;
1603+
}
1604+
1605+
/**
1606+
* FT.DEBUG INDEXER_SLEEP_BEFORE_YIELD [<microseconds>]
1607+
* Get or set the sleep time in microseconds before yielding during indexing while loading
1608+
*/
1609+
DEBUG_COMMAND(IndexerSleepBeforeYieldMicros) {
1610+
if (argc > 3) {
1611+
return RedisModule_WrongArity(ctx);
1612+
}
1613+
1614+
// Set new sleep time
1615+
if (argc == 3) {
1616+
long long sleepMicros;
1617+
if (RedisModule_StringToLongLong(argv[2], &sleepMicros) != REDISMODULE_OK || sleepMicros < 0) {
1618+
return RedisModule_ReplyWithError(ctx, "Invalid sleep time. Must be a non-negative integer.");
1619+
}
1620+
1621+
g_indexerSleepBeforeYieldMicros = (unsigned int)sleepMicros;
1622+
return RedisModule_ReplyWithSimpleString(ctx, "OK");
1623+
}
1624+
1625+
return RedisModule_WrongArity(ctx);
1626+
}
1627+
15981628
DebugCommandType commands[] = {{"DUMP_INVIDX", DumpInvertedIndex}, // Print all the inverted index entries.
15991629
{"DUMP_NUMIDX", DumpNumericIndex}, // Print all the headers (optional) + entries of the numeric tree.
16001630
{"DUMP_NUMIDXTREE", DumpNumericIndexTree}, // Print tree general info, all leaves + nodes + stats
@@ -1629,6 +1659,7 @@ DebugCommandType commands[] = {{"DUMP_INVIDX", DumpInvertedIndex}, // Print all
16291659
{"GET_HIDE_USER_DATA_FROM_LOGS", getHideUserDataFromLogs},
16301660
{"YIELDS_ON_LOAD_COUNTER", YieldCounter},
16311661
{"BG_SCAN_CONTROLLER", bgScanController},
1662+
{"INDEXER_SLEEP_BEFORE_YIELD_MICROS", IndexerSleepBeforeYieldMicros},
16321663
/**
16331664
* The following commands are for debugging distributed search/aggregation.
16341665
*/

src/debug_commands.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,9 @@ typedef struct DebugCTX {
3838

3939
// Should be called after each debug command that changes the debugCtx
4040
void validateDebugMode(DebugCTX *debugCtx);
41+
42+
// Yield counter functions
43+
void IncrementYieldCounter(void);
44+
45+
// Indexer sleep before yield functions
46+
unsigned int GetIndexerSleepBeforeYieldMicros(void);

src/indexer.c

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ static void writeCurEntries(DocumentIndexer *indexer, RSAddDocumentCtx *aCtx, Re
106106
if (invidx) {
107107
entry->docId = aCtx->doc->docId;
108108
RS_LOG_ASSERT(entry->docId, "docId should not be 0");
109-
IndexerYieldWhileLoading(ctx->redisCtx);
110109
writeIndexEntry(spec, invidx, encoder, entry);
111110
if (Index_StoreFieldMask(spec)) {
112111
invidx->fieldMask |= entry->fieldMask;
@@ -223,8 +222,6 @@ static void indexBulkFields(RSAddDocumentCtx *aCtx, RedisSearchCtx *sctx) {
223222
if (fs->types == INDEXFLD_T_FULLTEXT || !FieldSpec_IsIndexable(fs) || fdata->isNull) {
224223
continue;
225224
}
226-
227-
IndexerYieldWhileLoading(sctx->redisCtx);
228225
if (IndexerBulkAdd(cur, sctx, doc->fields + ii, fs, fdata, &cur->status) != 0) {
229226
IndexError_AddError(&cur->spec->stats.indexError, cur->status.message, cur->status.detail, doc->docKey);
230227
FieldSpec_AddError(&cur->spec->fields[fs->index], cur->status.message, cur->status.detail, doc->docKey);
@@ -399,14 +396,21 @@ bool g_isLoading = false;
399396
* Yield to Redis after a certain number of operations during indexing.
400397
* This helps keep Redis responsive during long indexing operations.
401398
* @param ctx The Redis context
399+
* @param numOps Tue number of operations to count in the counter before considering RSGlobalConfig.indexerYieldEveryOpsWhileLoading. These are related to the number of fields in the document
400+
* @param flags The flags to pass to RedisModule_Yield
402401
*/
403-
static void IndexerYieldWhileLoading(RedisModuleCtx *ctx) {
402+
void IndexerYieldWhileLoading(RedisModuleCtx *ctx, unsigned int numOps, int flags) {
404403
static size_t opCounter = 0;
405404

406-
// If server is loading, Yield to Redis every RSGlobalConfig.indexerYieldEveryOps operations
407-
if (g_isLoading && ++opCounter >= RSGlobalConfig.indexerYieldEveryOpsWhileLoading) {
408-
opCounter = 0;
405+
// If server is loading, Yield to Redis if the number of operations is greater than the yieldEveryOps
406+
opCounter += numOps;
407+
if (g_isLoading && opCounter >= RSGlobalConfig.indexerYieldEveryOpsWhileLoading) {
408+
opCounter = opCounter % RSGlobalConfig.indexerYieldEveryOpsWhileLoading;
409409
IncrementYieldCounter(); // Track that we called yield
410-
RedisModule_Yield(ctx, REDISMODULE_YIELD_FLAG_CLIENTS, NULL);
410+
unsigned int sleepMicros = GetIndexerSleepBeforeYieldMicros();
411+
if (sleepMicros > 0) {
412+
usleep(sleepMicros);
413+
}
414+
RedisModule_Yield(ctx, flags, NULL);
411415
}
412416
}

src/indexer.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ int IndexerBulkAdd(RSAddDocumentCtx *cur, RedisSearchCtx *sctx,
8888
* Yield to Redis after a certain number of operations during indexing while loading.
8989
* This helps keep Redis responsive during long indexing operations.
9090
* @param ctx The Redis context
91+
* @param numOps Tue number of operations to count in the counter before considering RSGlobalConfig.indexerYieldEveryOpsWhileLoading. These are related to the number of fields in the document
92+
* @param flags The flags to pass to RedisModule_Yield
9193
*/
92-
static void IndexerYieldWhileLoading(RedisModuleCtx *ctx);
94+
void IndexerYieldWhileLoading(RedisModuleCtx *ctx, unsigned int numOps, int flags);
9395

9496
#endif

src/info/indexes_info.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "indexes_info.h"
88
#include "util/dict.h"
99
#include "spec.h"
10+
#include <string.h> // Add this for strerror
1011

1112
// Assuming the GIL is held by the caller
1213
TotalIndexesInfo IndexesInfo_TotalInfo() {
@@ -27,7 +28,11 @@ TotalIndexesInfo IndexesInfo_TotalInfo() {
2728
continue;
2829
}
2930
// Lock for read
30-
pthread_rwlock_rdlock(&sp->rwlock);
31+
int rc = pthread_rwlock_rdlock(&sp->rwlock);
32+
if (rc != 0) {
33+
RedisModule_Log(RSDummyContext, "warning", "Failed to acquire read lock on index: rc=%d (%s). Cannot continue getting Index info", rc, strerror(rc));
34+
continue;
35+
}
3136

3237
// Vector index stats
3338
VectorIndexStats vec_info = IndexSpec_GetVectorIndexStats(sp);

src/spec.c

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1812,6 +1812,21 @@ void IndexSpec_InitializeSynonym(IndexSpec *sp) {
18121812

18131813
///////////////////////////////////////////////////////////////////////////////////////////////
18141814

1815+
static void IndexSpec_InitLock(IndexSpec *sp) {
1816+
int res = 0;
1817+
pthread_rwlockattr_t attr;
1818+
res = pthread_rwlockattr_init(&attr);
1819+
RS_ASSERT(res == 0);
1820+
#if !defined(__APPLE__) && !defined(__FreeBSD__) && defined(__GLIBC__)
1821+
int pref = PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP;
1822+
res = pthread_rwlockattr_setkind_np(&attr, pref);
1823+
RS_ASSERT(res == 0);
1824+
#endif
1825+
1826+
pthread_rwlock_init(&sp->rwlock, &attr);
1827+
}
1828+
1829+
18151830
IndexSpec *NewIndexSpec(const char *name) {
18161831
IndexSpec *sp = rm_calloc(1, sizeof(IndexSpec));
18171832
sp->fields = rm_calloc(sizeof(FieldSpec), SPEC_MAX_FIELDS);
@@ -1839,17 +1854,7 @@ IndexSpec *NewIndexSpec(const char *name) {
18391854
memset(&sp->stats, 0, sizeof(sp->stats));
18401855
sp->stats.indexError = IndexError_Init();
18411856

1842-
int res = 0;
1843-
pthread_rwlockattr_t attr;
1844-
res = pthread_rwlockattr_init(&attr);
1845-
RS_ASSERT(res == 0);
1846-
#if !defined(__APPLE__) && !defined(__FreeBSD__)
1847-
int pref = PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP;
1848-
res = pthread_rwlockattr_setkind_np(&attr, pref);
1849-
RS_ASSERT(res == 0);
1850-
#endif
1851-
1852-
pthread_rwlock_init(&sp->rwlock, &attr);
1857+
IndexSpec_InitLock(sp);
18531858

18541859
return sp;
18551860
}
@@ -2654,6 +2659,7 @@ int IndexSpec_CreateFromRdb(RedisModuleCtx *ctx, RedisModuleIO *rdb, int encver,
26542659
RedisModule_Free(rawName);
26552660

26562661
IndexSpec *sp = rm_calloc(1, sizeof(IndexSpec));
2662+
IndexSpec_InitLock(sp);
26572663
StrongRef spec_ref = StrongRef_New(sp, (RefManager_Free)IndexSpec_Free);
26582664
sp->own_ref = spec_ref;
26592665
// setting isDuplicate to true will make sure index will not be removed from aliases container.
@@ -2786,6 +2792,7 @@ void *IndexSpec_LegacyRdbLoad(RedisModuleIO *rdb, int encver) {
27862792

27872793
RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb);
27882794
IndexSpec *sp = rm_calloc(1, sizeof(IndexSpec));
2795+
IndexSpec_InitLock(sp);
27892796
StrongRef spec_ref = StrongRef_New(sp, (RefManager_Free)IndexSpec_Free);
27902797
sp->own_ref = spec_ref;
27912798

@@ -2883,6 +2890,7 @@ void *IndexSpec_LegacyRdbLoad(RedisModuleIO *rdb, int encver) {
28832890
Cursors_initSpec(sp);
28842891

28852892
dictAdd(legacySpecDict, sp->name, spec_ref.rm);
2893+
28862894
return spec_ref.rm;
28872895
}
28882896

@@ -3111,6 +3119,8 @@ int IndexSpec_UpdateDoc(IndexSpec *spec, RedisModuleCtx *ctx, RedisModuleString
31113119
return REDISMODULE_ERR;
31123120
}
31133121

3122+
unsigned int numOps = doc.numFields != 0 ? doc.numFields: 1;
3123+
IndexerYieldWhileLoading(ctx, numOps, REDISMODULE_YIELD_FLAG_CLIENTS);
31143124
RedisSearchCtx_LockSpecWrite(&sctx);
31153125
IndexSpec_IncrActiveWrites(spec);
31163126

tests/pytests/test_debug_commands.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def testDebugHelp(self):
5757
'GET_HIDE_USER_DATA_FROM_LOGS',
5858
'YIELDS_ON_LOAD_COUNTER',
5959
'BG_SCAN_CONTROLLER',
60+
'INDEXER_SLEEP_BEFORE_YIELD_MICROS',
6061
'FT.AGGREGATE',
6162
'FT.SEARCH',
6263
]

tests/pytests/test_rdb_load.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import os
2+
import pytest
3+
import multiprocessing
4+
import time
5+
from common import *
6+
from RLTest import Env
7+
8+
9+
def downloadFiles(rdbs):
10+
os.makedirs(REDISEARCH_CACHE_DIR, exist_ok=True) # create cache dir if not exists
11+
for f in rdbs:
12+
path = os.path.join(REDISEARCH_CACHE_DIR, f)
13+
if not os.path.exists(path):
14+
subprocess.run(["wget", "--no-check-certificate", BASE_RDBS_URL + f, "-O", path, "-q"])
15+
if not os.path.exists(path):
16+
return False
17+
return True
18+
19+
20+
@skip(cluster=True)
21+
@pytest.mark.timeout(120)
22+
def test_rdb_load_no_deadlock():
23+
"""
24+
Test that loading from RDB while constantly sending INFO commands doesn't cause deadlock.
25+
This test starts a clean Redis server, then triggers RDB loading from the client side
26+
while some subprocesses keep sending INFO commands.
27+
"""
28+
# Download the RDB file using downloadFile function
29+
rdb_filename = 'redisearch_2.6.9_with_vecsim.rdb'
30+
31+
# Create a clean Redis environment
32+
test_env = Env(moduleArgs='')
33+
34+
# Start the server first
35+
test_env.start()
36+
37+
# Verify server is running
38+
test_env.expect('PING').equal(True)
39+
40+
# Download the RDB file
41+
if not downloadFiles([rdb_filename]):
42+
test_env.assertTrue(False, message=f'Failed to download RDB file: {rdb_filename}')
43+
return
44+
45+
# Configure indexer to yield more frequently during loading to increase chance of deadlock
46+
test_env.cmd(config_cmd(), 'SET', 'INDEXER_YIELD_EVERY_OPS', '1')
47+
test_env.expect(debug_cmd(), 'INDEXER_SLEEP_BEFORE_YIELD_MICROS', '50000').ok()
48+
49+
# Get Redis configuration for RDB file location
50+
dbFileName = test_env.cmd('config', 'get', 'dbfilename')[1]
51+
dbDir = test_env.cmd('config', 'get', 'dir')[1]
52+
rdbFilePath = os.path.join(dbDir, dbFileName)
53+
54+
# Get the downloaded RDB file path
55+
filePath = os.path.join(REDISEARCH_CACHE_DIR, rdb_filename)
56+
57+
# Create symlink to the downloaded RDB file
58+
try:
59+
os.unlink(rdbFilePath)
60+
except OSError:
61+
pass
62+
os.symlink(filePath, rdbFilePath)
63+
64+
# Give the system time to process the symlink
65+
time.sleep(1)
66+
67+
def info_command_process(port):
68+
"""Process that continuously sends INFO commands"""
69+
import redis
70+
71+
# Create a new connection in this process
72+
conn = redis.Redis(host='localhost', port=port, decode_responses=True)
73+
74+
while True:
75+
try:
76+
result = conn.execute_command('INFO', 'everything')
77+
except Exception as e:
78+
continue
79+
80+
# Start the INFO command thread
81+
redis_port = test_env.getConnection().connection_pool.connection_kwargs['port']
82+
info_processes = []
83+
84+
for i in range(20):
85+
process = multiprocessing.Process(
86+
target=info_command_process,
87+
args=(redis_port,),
88+
daemon=True
89+
)
90+
process.start()
91+
info_processes.append(process)
92+
93+
# Get current database size before reload
94+
# Trigger the reload - use NOSAVE to prevent overwriting our RDB file
95+
test_env.cmd('DEBUG', 'RELOAD', 'NOSAVE')
96+
for process in info_processes:
97+
process.terminate()
98+
process.join()
99+
100+
test_env.expect('PING').equal(True)
101+
102+
# Check database size to see if anything was loaded
103+
dbsize = test_env.cmd('DBSIZE')
104+
105+
# Try to get info about any existing indices
106+
indices_info = test_env.cmd('FT._LIST')
107+
assert indices_info, "No indices found after RDB load"
108+
# If there are indices, verify we can get info about the first one
109+
test_env.expect('FT.INFO', indices_info[0]).noError()

0 commit comments

Comments
 (0)