Skip to content

Commit 54f050c

Browse files
authored
Fix internal cursors not being deleted immedietly in cluster mode - [MOD-12493] (#7399)
* refresh cache on every modification * add a failing test * fix bug * add assert to cover a future edge case * remove redundant cleanup * use helper
1 parent b51abe8 commit 54f050c

4 files changed

Lines changed: 52 additions & 14 deletions

File tree

src/coord/hybrid/dist_utils.c

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,6 @@ bool getCursorCommand(long long cursorId, MRCommand *cmd, MRIteratorCtx *ctx) {
181181
RS_ASSERT(atoll(cmd->strs[3]) == cursorId);
182182

183183
if (timedout) {
184-
// We are going to modify the command, so we need to free the cached sds
185-
if (cmd->cmd) {
186-
sdsfree(cmd->cmd);
187-
cmd->cmd = NULL;
188-
}
189184
// If we timed out and it's a profile command, we want to get the profile data
190185
if (cmd->forProfiling) {
191186
RS_LOG_ASSERT(!cmd->forCursor, "profile is not supported on a cursor command");

src/coord/rmr/command.c

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,15 @@
2222
#define shift_right(arr, len, start, by) \
2323
memmove((arr) + (start) + (by), (arr) + (start), ((len) - (start)) * sizeof(*(arr)));
2424

25-
void MRCommand_Free(MRCommand *cmd) {
25+
static inline void dropCachedCmdIfNeeded(MRCommand *cmd) {
2626
if (cmd->cmd) {
2727
sdsfree(cmd->cmd);
28+
cmd->cmd = NULL;
2829
}
30+
}
31+
32+
void MRCommand_Free(MRCommand *cmd) {
33+
dropCachedCmdIfNeeded(cmd);
2934
for (int i = 0; i < cmd->num; i++) {
3035
rm_free(cmd->strs[i]);
3136
}
@@ -39,6 +44,8 @@ static void assignStr(MRCommand *cmd, size_t idx, const char *s, size_t n) {
3944
cmd->lens[idx] = n;
4045
news[n] = '\0';
4146
memcpy(news, s, n);
47+
// Drop the cached sds command representation if set
48+
dropCachedCmdIfNeeded(cmd);
4249
}
4350

4451
static void assignCstr(MRCommand *cmd, size_t idx, const char *s) {
@@ -176,6 +183,8 @@ void MRCommand_ReplaceArgNoDup(MRCommand *cmd, int index, char *newArg, size_t l
176183
rm_free(cmd->strs[index]);
177184
cmd->strs[index] = newArg;
178185
cmd->lens[index] = len;
186+
// Drop the cached sds command representation if set
187+
dropCachedCmdIfNeeded(cmd);
179188
}
180189
void MRCommand_ReplaceArg(MRCommand *cmd, int index, const char *newArg, size_t len) {
181190
char *news = rm_malloc(len + 1);
@@ -206,6 +215,7 @@ void MRCommand_ReplaceArgSubstring(MRCommand *cmd, int index, size_t pos, size_t
206215
memset(oldArg + pos + newLen, ' ', oldSubStringLen - newLen);
207216

208217
// No length change needed - argument stays same size
218+
RS_LOG_ASSERT(!cmd->cmd, "Expect MRCommand_ReplaceArgSubstring to be called before `cmd` is used for the first time");
209219
return;
210220
}
211221

@@ -254,10 +264,4 @@ void MRCommand_SetSlotInfo(MRCommand *cmd, const RedisModuleSlotRangeArray *slot
254264
char *serialized = SlotRangesArray_Serialize(slots);
255265
size_t serializedLen = SlotRangeArray_SizeOf(slots->num_ranges);
256266
MRCommand_ReplaceArgNoDup(cmd, cmd->slotsInfoArgIndex, serialized, serializedLen);
257-
// This function is expected to be called from an io thread, which means that
258-
// the command may have already been used, so we drop the cached sds command representation
259-
if (cmd->cmd) {
260-
sdsfree(cmd->cmd);
261-
cmd->cmd = NULL;
262-
}
263267
}

src/coord/rmr/rmr.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -793,8 +793,7 @@ void MRIterator_Release(MRIterator *it) {
793793
RS_DEBUG_LOG_FMT("changing command from %s to DEL for shard: %d", cmd->strs[1], cmd->targetShard);
794794
RS_LOG_ASSERT_FMT(cmd->rootCommand != C_DEL, "DEL command should be sent only once to a shard. pending = %d", it->ctx.pending);
795795
cmd->rootCommand = C_DEL;
796-
strcpy(cmd->strs[1], "DEL");
797-
cmd->lens[1] = 3;
796+
MRCommand_ReplaceArg(cmd, 1, "DEL", 3);
798797
}
799798
}
800799
// Take a reference to the iterator for the next batch of commands.

tests/pytests/test_issues.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1605,3 +1605,43 @@ def test_mod_8157_RESP3():
16051605
def test_mod_11975(env: Env):
16061606
env.expect('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT').ok()
16071607
env.expect('FT.SEARCH', 'idx', '@t:("*")', 'DIALECT', '2').equal([0])
1608+
1609+
@skip(cluster=False)
1610+
def test_mod_12493(env:Env):
1611+
env.expect('FT.CREATE', 'idx', 'SCHEMA', 'n', 'NUMERIC').ok()
1612+
1613+
# Add enough documents so 4 reads are needed from each shard to read all results (chunk size is 1000)
1614+
n_docs = 3200 * env.shardsCount
1615+
with env.getClusterConnectionIfNeeded() as conn:
1616+
for i in range(n_docs):
1617+
conn.execute_command('HSET', f'doc{i}', 'n', i)
1618+
1619+
# Create a cursor
1620+
_, cursor = env.cmd('FT.AGGREGATE', 'idx', '*', 'WITHCURSOR')
1621+
1622+
# Check that there are pending cursors on all shards
1623+
env.assertEqual(to_dict(index_info(env)['cursor_stats'])['index_total'], env.shardsCount)
1624+
1625+
# Read another env.shardCount - 1 times. Expecting that each read will read 1000 results from each shard,
1626+
# so after env.shardsCount reads (including the initial aggregate), we will trigger another read from each shard
1627+
for _ in range(env.shardsCount - 1):
1628+
_, cursor = env.cmd('FT.CURSOR', 'READ', 'idx', cursor)
1629+
1630+
# Check again that there are pending cursors on all shards
1631+
env.assertEqual(to_dict(index_info(env)['cursor_stats'])['index_total'], env.shardsCount)
1632+
# Check command stats for internal cursors command. We expect a single one (READ) on each shard
1633+
for i, con in enumerate(env.getOSSMasterNodesConnectionList()):
1634+
stats = con.execute_command('INFO', 'COMMANDSTATS')['cmdstat__FT.CURSOR']
1635+
env.assertEqual(stats['calls'], 1, message=f'Expected 1 call on shard {i}, got {stats["calls"]}')
1636+
1637+
# Delete the cursor. This should delete the internal cursors on all shards.
1638+
# If we call READ instead, they won't be deleted or depleted (3rd read, and we have 4 chunks), and the test will fail.
1639+
env.expect('FT.CURSOR', 'DEL', 'idx', cursor).ok()
1640+
1641+
# Expect another call on each shard for the DEL command
1642+
for i, con in enumerate(env.getOSSMasterNodesConnectionList()):
1643+
stats = con.execute_command('INFO', 'COMMANDSTATS')['cmdstat__FT.CURSOR']
1644+
env.assertEqual(stats['calls'], 2, message=f'Expected 2 calls on shard {i}, got {stats["calls"]}')
1645+
1646+
# Check that the internal cursors were deleted on all shards
1647+
env.assertEqual(to_dict(index_info(env)['cursor_stats'])['index_total'], 0)

0 commit comments

Comments
 (0)