Skip to content

Commit 005186a

Browse files
authored
[MOD-12414] Add Internal cursor reads metric to cluster FT.PROFILE output (#7709)
* add cursor_reads to req struct update in runCursor print in Profile_Print if cursor * Fix: Preserve timeout flag across cursor reads in cluster FT.PROFILE Use |= instead of = to ensure has_timedout remains true once set, preventing earlier cursor read warnings from being lost in final output. * fix test * fix test * add message to the test
1 parent a6a9904 commit 005186a

6 files changed

Lines changed: 132 additions & 5 deletions

File tree

src/aggregate/aggregate.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,10 @@ typedef struct AREQ {
283283
// Indicates whether the query has timed out.
284284
// Useful for query with cursor and RETURN policy
285285
bool has_timedout;
286+
287+
// Number of cursor reads: 1 for the initial FT.AGGREGATE WITHCURSOR,
288+
// plus 1 for each subsequent FT.CURSOR READ call.
289+
size_t cursor_reads;
286290
} AREQ;
287291

288292
/**

src/aggregate/aggregate_exec.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,7 @@ static void sendChunk_Resp2(AREQ *req, RedisModule_Reply *reply, size_t limit,
472472
&& req->reqConfig.timeoutPolicy == TimeoutPolicy_Return));
473473

474474
bool has_timedout = (rc == RS_RESULT_TIMEDOUT) || hasTimeoutError(qctx->err);
475-
req->has_timedout = has_timedout;
475+
req->has_timedout |= has_timedout;
476476
if (has_timedout) {
477477
// Track warnings in global statistics
478478
// Assuming that if we reached here, timeout is not an error.
@@ -489,7 +489,7 @@ static void sendChunk_Resp2(AREQ *req, RedisModule_Reply *reply, size_t limit,
489489
RedisSearchCtx *sctx = AREQ_SearchCtx(req);
490490
ProfilePrinterCtx profileCtx = {
491491
.req = req,
492-
.timedout = has_timedout,
492+
.timedout = req->has_timedout,
493493
.reachedMaxPrefixExpansions = QueryError_HasReachedMaxPrefixExpansionsWarning(qctx->err),
494494
.bgScanOOM = sctx->spec && sctx->spec->scan_failed_OOM,
495495
.queryOOM = QueryError_HasQueryOOMWarning(qctx->err),
@@ -638,15 +638,15 @@ static void sendChunk_Resp3(AREQ *req, RedisModule_Reply *reply, size_t limit,
638638
&& req->reqConfig.timeoutPolicy == TimeoutPolicy_Return));
639639

640640
bool has_timedout = (rc == RS_RESULT_TIMEDOUT) || hasTimeoutError(qctx->err);
641-
req->has_timedout = has_timedout;
641+
req->has_timedout |= has_timedout;
642642

643643
if (IsProfile(req)) {
644644
RedisModule_Reply_MapEnd(reply); // >Results
645645
if (!(AREQ_RequestFlags(req) & QEXEC_F_IS_CURSOR) || cursor_done) {
646646
// Prepare profile printer context
647647
ProfilePrinterCtx profileCtx = {
648648
.req = req,
649-
.timedout = has_timedout,
649+
.timedout = req->has_timedout,
650650
.reachedMaxPrefixExpansions = QueryError_HasReachedMaxPrefixExpansionsWarning(qctx->err),
651651
.bgScanOOM = sctx->spec && sctx->spec->scan_failed_OOM,
652652
.queryOOM = QueryError_HasQueryOOMWarning(qctx->err),
@@ -1261,6 +1261,7 @@ int AREQ_StartCursor(AREQ *r, RedisModule_Reply *reply, StrongRef spec_ref, Quer
12611261
// Assumes that the cursor has a strong ref to the relevant spec and that it is already locked.
12621262
static void runCursor(RedisModule_Reply *reply, Cursor *cursor, size_t num) {
12631263
AREQ *req = cursor->execState;
1264+
req->cursor_reads++;
12641265

12651266
// update timeout for current cursor read
12661267
SearchCtx_UpdateTime(AREQ_SearchCtx(req), req->reqConfig.queryTimeoutMS);

src/profile.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,13 @@ void Profile_Print(RedisModule_Reply *reply, void *ctx) {
194194
RedisModule_ReplyKV_SimpleString(reply, "Warning", "None");
195195
}
196196

197+
// Print cursor reads count if this is a cursor request.
198+
if (IsCursor(req)) {
199+
// Only internal requests can use profile with cursor.
200+
RS_ASSERT(IsInternal(req));
201+
RedisModule_ReplyKV_LongLong(reply, "Internal cursor reads", req->cursor_reads);
202+
}
203+
197204
// Print profile of iterators
198205
QueryIterator *root = QITR_GetRootFilter(qctx);
199206
// Coordinator does not have iterators

tests/pytests/test_aggregate_count.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def _get_cluster_RP_profile(env, res) -> list:
5454

5555
else:
5656
for i in range(len(res[1][1])):
57-
shard = res[1][1][i][13]
57+
shard = res[1][1][i][15]
5858
shard_RP_and_count.append([(item[1], item[5]) for item in shard])
5959

6060
# sort shard by the number of results processed by the first RP

tests/pytests/test_profile.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22

3+
import math
34
import unittest
45
from includes import *
56
from common import *
@@ -589,6 +590,119 @@ def testTimedOutWarningCoordResp3():
589590
def testTimedOutWarningCoordResp2():
590591
TimedOutWarningtestCoord(Env(protocol=2))
591592

593+
def get_shards_profile(env, res):
594+
"""Extract shard profiles from FT.PROFILE AGGREGATE response."""
595+
if env.protocol == 3:
596+
return res['Profile']['Shards']
597+
else:
598+
return [to_dict(p) for p in res[-1][1]]
599+
600+
def InternalCursorReadsInProfile(protocol):
601+
"""Tests that 'Internal cursor reads' appears in shard profiles for AGGREGATE."""
602+
# Limit number of shards to avoid creating too many docs
603+
env = Env(shardsCount=2, protocol=protocol)
604+
conn = getConnectionByEnv(env)
605+
env.cmd(config_cmd(), 'SET', '_PRINT_PROFILE_CLOCK', 'false')
606+
607+
env.expect('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT').ok()
608+
609+
# Insert docs - with default cursorReadSize=1000, each shard needs more than 1000 to require 2 reads
610+
num_docs = int(1000 * 1.1 * env.shardsCount)
611+
for i in range(num_docs):
612+
conn.execute_command('HSET', f'doc{i}', 't', f'hello{i}')
613+
614+
# Run FT.PROFILE AGGREGATE - coordinator uses internal cursors to shards
615+
res = env.cmd('FT.PROFILE', 'idx', 'AGGREGATE', 'QUERY', '*')
616+
617+
shards_profile = get_shards_profile(env, res)
618+
env.assertEqual(len(shards_profile), env.shardsCount, message=f"unexpected number of shards. full reply output: {res}")
619+
620+
# Each shard should have exactly 2 cursor reads (1000+ docs per shard, default cursorReadSize=1000)
621+
for shard_profile in shards_profile:
622+
env.assertContains('Internal cursor reads', shard_profile)
623+
env.assertEqual(shard_profile['Internal cursor reads'], 2)
624+
625+
@skip(cluster=False)
626+
def testInternalCursorReadsInProfileResp3():
627+
InternalCursorReadsInProfile(protocol=3)
628+
629+
@skip(cluster=False)
630+
def testInternalCursorReadsInProfileResp2():
631+
InternalCursorReadsInProfile(protocol=2)
632+
633+
@skip(cluster=False)
634+
def testInternalCursorReadsWithTimeoutResp3():
635+
"""Tests 'Internal cursor reads' with timeout - RESP3 coordinator detects timeout and stops early."""
636+
env = Env(protocol=3)
637+
conn = getConnectionByEnv(env)
638+
run_command_on_all_shards(env, config_cmd(), 'SET', '_PRINT_PROFILE_CLOCK', 'false')
639+
640+
env.expect('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT').ok()
641+
642+
num_docs = 100
643+
for i in range(num_docs):
644+
conn.execute_command('HSET', f'doc{i}', 't', f'hello{i}')
645+
646+
# Run FT.PROFILE AGGREGATE with simulated timeout on shards only
647+
query = ['FT.PROFILE', 'idx', 'AGGREGATE', 'QUERY', '*']
648+
timeout_after_n = 5
649+
res = runDebugQueryCommandTimeoutAfterN(env, query, timeout_after_n, internal_only=True)
650+
651+
# RESP3: coordinator detects shard timeout and stops early after reading first shard's reply
652+
# Results count equals first shard's reply length (timeout_after_n)
653+
env.assertEqual(len(res['Results']['results']), timeout_after_n)
654+
655+
shards_profile = get_shards_profile(env, res)
656+
for shard_profile in shards_profile:
657+
env.assertContains('Internal cursor reads', shard_profile, message=f"full reply output: {res}")
658+
# Coordinator stops after first timeout, so only 1 cursor read per shard
659+
env.assertEqual(shard_profile['Internal cursor reads'], 1, message=f"full reply output: {res}")
660+
env.assertEqual(shard_profile['Warning'], 'Timeout limit was reached', message=f"full reply output: {res}")
661+
662+
@skip(cluster=False)
663+
def testInternalCursorReadsWithTimeoutResp2():
664+
"""Tests 'Internal cursor reads' with timeout - RESP2 coordinator doesn't detect timeout, reads until EOF."""
665+
env = Env(shardsCount=2, protocol=2)
666+
conn = getConnectionByEnv(env)
667+
run_command_on_all_shards(env, config_cmd(), 'SET', '_PRINT_PROFILE_CLOCK', 'false')
668+
669+
env.expect('FT.CREATE', 'idx', 'SCHEMA', 't', 'TEXT').ok()
670+
671+
num_docs = 100
672+
for i in range(num_docs):
673+
conn.execute_command('HSET', f'doc{i}', 't', f'hello{i}')
674+
675+
# Run FT.PROFILE AGGREGATE with simulated timeout on shards only
676+
query = ['FT.PROFILE', 'idx', 'AGGREGATE', 'QUERY', '*']
677+
timeout_after_n = 5
678+
res = runDebugQueryCommandTimeoutAfterN(env, query, timeout_after_n, internal_only=True)
679+
680+
# RESP2: coordinator doesn't check shard timeout, reads until EOF
681+
# All docs are returned
682+
env.assertEqual(len(res[0]) - 1, num_docs)
683+
684+
shards_profile = get_shards_profile(env, res)
685+
env.assertEqual(len(shards_profile), env.shardsCount, message=f"unexpected number of shards. full reply output: {res}")
686+
687+
# Verify total cursor reads matches expected (order of shards may differ)
688+
total_expected_reads = 0
689+
for shard_conn in env.getOSSMasterNodesConnectionList():
690+
docs_on_shard = shard_conn.execute_command('DBSIZE')
691+
total_expected_reads += math.ceil(docs_on_shard / timeout_after_n)
692+
693+
# The order of shards in the profile response may differ, so we can't check per-shard
694+
total_actual_reads = sum(sp['Internal cursor reads'] for sp in shards_profile)
695+
env.assertEqual(total_actual_reads, total_expected_reads, message=f"full reply output: {res}")
696+
697+
# Verify each shard has warning
698+
for shard_profile in shards_profile:
699+
env.assertContains('Internal cursor reads', shard_profile, message=f"full reply output: {res}")
700+
env.assertEqual(shard_profile['Warning'], 'Timeout limit was reached', message=f"full reply output: {res}")
701+
702+
# Coordinator should NOT have timeout warning (it doesn't detect it in RESP2)
703+
coord_profile = to_dict(res[-1][-1])
704+
env.assertEqual(coord_profile['Warning'], 'None', message=f"full reply output: {res}")
705+
592706
# This test is currently skipped due to flaky behavior of some of the machines'
593707
# timers. MOD-6436
594708
@skip()

tests/pytests/test_resp3.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ def test_coord_profile():
252252
'Pipeline creation time': ANY,
253253
'Total GIL time': ANY,
254254
'Warning': 'None',
255+
'Internal cursor reads': ANY,
255256
'Iterators profile': {'Type': 'WILDCARD', 'Time': ANY, 'Number of reading operations': ANY},
256257
'Result processors profile': [{'Type': 'Index', 'Time': ANY, 'Results processed': ANY},]
257258
}

0 commit comments

Comments
 (0)