Skip to content

Commit eb8993c

Browse files
authored
Backport info metrics including index error mechanism to 2.6 [MOD-8450] (#5413)
* CP index error mechanism (without reply) - WIP * remove redundant reply_macros includes * Fix memory bug with RS string * Clean field spec error upon removing index * Revert -fvisibility=hidden in coord build * Fix index error clear before init upon rdb load failure * CP all info for metrics WIP (build + unit pass, flow tests not yet) * remove trimAllocation API from mock, add comment about duplicate open vector index function. * add all info fields back update error stats upon deleting spec set max dialect 3 adjust info tests (revert geoshape etc) * don't access NULL search ctx in coord * fix active queries test to suit 2.6 where coordinator ignores index error (return 0 instead of error) * Addressing Guy's CR
1 parent 69c48de commit eb8993c

61 files changed

Lines changed: 3149 additions & 2055 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ file(GLOB SOURCES
7575
"src/query_parser/v2/*.c"
7676
"src/util/*.c"
7777
"src/trie/*.c"
78+
"src/info/*.c"
7879

7980
"deps/cndict/cndict_data.c"
8081
"deps/libnu/*.c"

coord/src/dist_aggregate.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
#include "profile.h"
1515
#include "util/timeout.h"
1616
#include "coord/src/config.h"
17+
#include "util/misc.h"
18+
#include "util/units.h"
1719

1820
#include <err.h>
1921

@@ -91,7 +93,7 @@ RSValue *MRReply_ToValue(MRReply *r) {
9193
case MR_REPLY_STATUS:
9294
case MR_REPLY_STRING: {
9395
size_t l;
94-
char *s = MRReply_String(r, &l);
96+
const char *s = MRReply_String(r, &l);
9597
v = RS_NewCopiedString(s, l);
9698
// v = RS_StringValT(s, l, RSString_Volatile);
9799
break;
@@ -400,7 +402,6 @@ static int parseProfile(RedisModuleString **argv, int argc, AREQ *r) {
400402
int profileArgs = 0;
401403
if (RMUtil_ArgIndex("FT.PROFILE", argv, 1) != -1) {
402404
profileArgs += 2; // SEARCH/AGGREGATE + QUERY
403-
r->initClock = clock();
404405
r->reqflags |= QEXEC_F_PROFILE;
405406
if (RMUtil_ArgIndex("LIMITED", argv + 3, 1) != -1) {
406407
profileArgs++;
@@ -420,7 +421,8 @@ void RSExecDistAggregate(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
420421
AREQ *r = AREQ_New();
421422
QueryError status = {0};
422423
r->qiter.err = &status;
423-
r->reqflags |= QEXEC_F_IS_EXTENDED;
424+
r->reqflags |= QEXEC_F_IS_AGGREGATE;
425+
r->initClock = clock();
424426

425427
int profileArgs = parseProfile(argv, argc, r);
426428
if (profileArgs == -1) goto err;

coord/src/info_command.c

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,14 @@
55
*/
66

77
#include "info_command.h"
8+
#include "info/field_spec_info.h"
89

910
// Type of field returned in INFO
1011
typedef enum {
1112
InfoField_WholeSum,
1213
InfoField_DoubleSum,
1314
InfoField_DoubleAverage,
14-
InfoField_Max
15+
InfoField_Max,
1516
} InfoFieldType;
1617

1718
// Field specification
@@ -79,9 +80,13 @@ typedef struct {
7980
size_t total_l;
8081
double total_d;
8182
struct {
82-
double avg;
83+
double sum;
8384
double count;
8485
} avg;
86+
struct {
87+
char *str;
88+
size_t len;
89+
} str;
8590
} u;
8691
} InfoValue;
8792

@@ -117,24 +122,33 @@ static size_t replyKvArray(InfoFields *fields, RedisModuleCtx *ctx, InfoValue *v
117122

118123
// Writes field data to the target
119124
static void convertField(InfoValue *dst, MRReply *src, InfoFieldType type) {
120-
if (type == InfoField_WholeSum) {
121-
long long tmp;
122-
MRReply_ToInteger(src, &tmp);
123-
dst->u.total_l += tmp;
124-
} else if (type == InfoField_DoubleSum) {
125-
double d;
126-
MRReply_ToDouble(src, &d);
127-
dst->u.total_d += d;
128-
} else if (type == InfoField_DoubleAverage) {
129-
dst->u.avg.count++;
130-
double d;
131-
MRReply_ToDouble(src, &d);
132-
dst->u.avg.avg += d;
133-
} else if (type == InfoField_Max) {
134-
long long newVal;
135-
MRReply_ToInteger(src, &newVal);
136-
if (dst->u.total_l < newVal) {
137-
dst->u.total_l = newVal;
125+
switch (type) {
126+
case InfoField_WholeSum: {
127+
long long tmp;
128+
MRReply_ToInteger(src, &tmp);
129+
dst->u.total_l += tmp;
130+
break;
131+
}
132+
case InfoField_DoubleSum: {
133+
double d;
134+
MRReply_ToDouble(src, &d);
135+
dst->u.total_d += d;
136+
break;
137+
}
138+
case InfoField_DoubleAverage: {
139+
dst->u.avg.count++;
140+
double d;
141+
MRReply_ToDouble(src, &d);
142+
dst->u.avg.sum += d;
143+
break;
144+
}
145+
case InfoField_Max: {
146+
long long newVal;
147+
MRReply_ToInteger(src, &newVal);
148+
if (dst->u.total_l < newVal) {
149+
dst->u.total_l = newVal;
150+
}
151+
break;
138152
}
139153
}
140154
dst->isSet = 1;
@@ -169,7 +183,7 @@ static void recomputeAverageCycleTimeMs(InfoValue* gcValues, InfoFieldSpec* gcSp
169183
struct InfoFieldTypeAndValue total_ms = findInfoTypeAndValue(gcValues, gcSpecs, numFields, "total_ms_run");
170184
if (total_cycles.value && total_ms.value && avg_cycle_time_ms.type == InfoField_DoubleAverage) {
171185
avg_cycle_time_ms.value->u.avg.count = total_cycles.value->u.total_l;
172-
avg_cycle_time_ms.value->u.avg.avg = total_ms.value->u.total_l;
186+
avg_cycle_time_ms.value->u.avg.sum = total_ms.value->u.total_l;
173187
avg_cycle_time_ms.value->isSet = 1;
174188
}
175189
}
@@ -256,7 +270,7 @@ static size_t replyKvArray(InfoFields *fields, RedisModuleCtx *ctx, InfoValue *v
256270
RedisModule_ReplyWithDouble(ctx, source->u.total_d);
257271
} else if (type == InfoField_DoubleAverage) {
258272
if (source->u.avg.count) {
259-
RedisModule_ReplyWithDouble(ctx, source->u.avg.avg / source->u.avg.count);
273+
RedisModule_ReplyWithDouble(ctx, source->u.avg.sum / source->u.avg.count);
260274
} else {
261275
RedisModule_ReplyWithDouble(ctx, 0);
262276
}

coord/src/module.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
#include "value.h"
3131
#include "cluster_spell_check.h"
3232
#include "profile.h"
33-
33+
#include "info/global_stats.h"
34+
#include "util/units.h"
3435
#include "libuv/include/uv.h"
3536

3637
#include <stdlib.h>
@@ -84,9 +85,9 @@ int uniqueStringsReducer(struct MRCtx *mc, int count, MRReply **replies) {
8485
nArrs++;
8586
for (size_t j = 0; j < MRReply_Length(replies[i]); j++) {
8687
size_t sl = 0;
87-
char *s = MRReply_String(MRReply_ArrayElement(replies[i], j), &sl);
88+
const char *s = MRReply_String(MRReply_ArrayElement(replies[i], j), &sl);
8889
if (s && sl) {
89-
TrieMap_Add(dict, s, sl, NULL, NULL);
90+
TrieMap_Add(dict, (char*)s, sl, NULL, NULL);
9091
}
9192
}
9293
} else if (MRReply_Type(replies[i]) == MR_REPLY_ERROR && err == NULL) {
@@ -370,6 +371,7 @@ typedef struct {
370371
char *queryString;
371372
long long offset;
372373
long long limit;
374+
long long initClock;
373375
long long requestedResultsCount;
374376
int withScores;
375377
int withExplainScores;
@@ -566,6 +568,8 @@ searchRequestCtx *rscParseRequest(RedisModuleString **argv, int argc, QueryError
566568

567569
searchRequestCtx *req = searchRequestCtx_New();
568570

571+
req->initClock = clock();
572+
569573
if (rscParseProfile(req, argv) != REDISMODULE_OK) {
570574
searchRequestCtx_Free(req);
571575
return NULL;
@@ -736,7 +740,7 @@ searchResult *newResult(searchResult *cached, MRReply *arr, int j, searchReplyOf
736740
res->id = NULL;
737741
return res;
738742
}
739-
res->id = MRReply_String(MRReply_ArrayElement(arr, j), &res->idLen);
743+
res->id = (char*)MRReply_String(MRReply_ArrayElement(arr, j), &res->idLen);
740744
if (!res->id) {
741745
return res;
742746
}
@@ -1254,6 +1258,8 @@ static int searchResultReducer(struct MRCtx *mc, int count, MRReply **replies) {
12541258
profileSearchReply(ctx, &rCtx, count, replies, req->profileClock, postProccesTime);
12551259
}
12561260

1261+
TotalGlobalStats_CountQuery(QEXEC_F_IS_SEARCH, clock() - req->initClock);
1262+
12571263
cleanup:
12581264
if (rCtx.pq) {
12591265
heap_destroy(rCtx.pq);

coord/src/rmr/redis_cluster.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#include "redismodule.h"
1212
#include "rmutil/periodic.h"
1313
#include "version.h"
14+
#include "module.h"
15+
#include "util/strconv.h"
1416

1517
#define REDIS_CLUSTER_REFRESH_TIMEOUT 1000
1618

src/aggregate/aggregate.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ extern "C" {
2424
typedef struct Grouper Grouper;
2525

2626
typedef enum {
27-
QEXEC_F_IS_EXTENDED = 0x01, // Contains aggregations or projections
27+
QEXEC_F_IS_AGGREGATE = 0x01, // Is an aggregate command
2828
QEXEC_F_SEND_SCORES = 0x02, // Output: Send scores with each result
2929
QEXEC_F_SEND_SORTKEYS = 0x04, // Sent the key used for sorting, for each result
3030
QEXEC_F_SEND_NOFIELDS = 0x08, // Don't send the contents of the fields
@@ -35,7 +35,7 @@ typedef enum {
3535
/** Don't use concurrent execution */
3636
QEXEC_F_SAFEMODE = 0x100,
3737

38-
/* The inverse of IS_EXTENDED. The two cannot coexist together */
38+
/* The inverse of IS_AGGREGATE. The two cannot coexist together */
3939
QEXEC_F_IS_SEARCH = 0x200,
4040

4141
/* Highlight/summarize options are active */
@@ -60,11 +60,14 @@ typedef enum {
6060
/* FT.AGGREGATE load all fields */
6161
QEXEC_AGG_LOAD_ALL = 0x20000,
6262

63+
// The query is internal (responding to a command from the coordinator)
64+
QEXEC_F_INTERNAL = 0x400000,
6365
} QEFlags;
6466

6567
#define IsCount(r) ((r)->reqflags & QEXEC_F_NOROWS)
6668
#define IsSearch(r) ((r)->reqflags & QEXEC_F_IS_SEARCH)
6769
#define IsProfile(r) ((r)->reqflags & QEXEC_F_PROFILE)
70+
#define IsInternal(r) ((r)->reqflags & QEXEC_F_INTERNAL)
6871

6972
typedef enum {
7073
/* Received EOF from iterator */
@@ -266,7 +269,7 @@ int RSCursorCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
266269

267270
/**
268271
* @brief Parse a dialect version from var args
269-
*
272+
*
270273
* @param dialect pointer to unsigned int to store the parsed value
271274
* @param ac ArgsCruser set to point on the dialect version position in the var args list
272275
* @param status QueryError struct to contain error messages

src/aggregate/aggregate_exec.c

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "score_explain.h"
1515
#include "commands.h"
1616
#include "profile.h"
17+
#include "info/global_stats.h"
1718

1819
typedef enum { COMMAND_AGGREGATE, COMMAND_SEARCH, COMMAND_EXPLAIN } CommandType;
1920
static void runCursor(RedisModuleCtx *outputCtx, Cursor *cursor, size_t num);
@@ -233,6 +234,10 @@ static size_t getResultsFactor(AREQ *req) {
233234
return count;
234235
}
235236

237+
static bool hasTimeoutError(QueryError *err) {
238+
return QueryError_GetCode(err) == QUERY_TIMEDOUT;
239+
}
240+
236241
/**
237242
* Sends a chunk of <n> rows, optionally also sending the preamble
238243
*/
@@ -247,7 +252,9 @@ void sendChunk(AREQ *req, RedisModuleCtx *outctx, size_t limit) {
247252
!(req->reqflags & QEXEC_F_IS_SEARCH)) {
248253
limit = RSGlobalConfig.maxAggregateResults;
249254
}
250-
255+
if (req->sctx) {
256+
IndexSpec_IncrActiveQueries(req->sctx->spec);
257+
}
251258
cachedVars cv = {0};
252259
cv.lastLk = AGPLN_GetLookup(&req->ap, NULL, AGPLN_GETLOOKUP_LAST);
253260
cv.lastAstp = AGPLN_GetArrangeStep(&req->ap);
@@ -313,7 +320,12 @@ void sendChunk(AREQ *req, RedisModuleCtx *outctx, size_t limit) {
313320
if (rc != RS_RESULT_OK) {
314321
req->stateflags |= QEXEC_S_ITERDONE;
315322
}
316-
323+
if (req->sctx) {
324+
IndexSpec_DecrActiveQueries(req->sctx->spec);
325+
}
326+
if (QueryError_GetCode(req->qiter.err) == QUERY_OK || hasTimeoutError(req->qiter.err)) {
327+
TotalGlobalStats_CountQuery(req->reqflags, clock() - req->initClock);
328+
}
317329
// Reset the total results length:
318330
req->qiter.totalResults = 0;
319331
if (resultsLen == REDISMODULE_POSTPONED_ARRAY_LEN) {
@@ -347,7 +359,7 @@ static int buildRequest(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
347359
(*r)->reqflags |= QEXEC_F_IS_SEARCH;
348360
}
349361
else if (type == COMMAND_AGGREGATE) {
350-
(*r)->reqflags |= QEXEC_F_IS_EXTENDED;
362+
(*r)->reqflags |= QEXEC_F_IS_AGGREGATE;
351363
}
352364

353365
if (AREQ_Compile(*r, argv + 2, argc - 2, status) != REDISMODULE_OK) {
@@ -433,6 +445,15 @@ static int execCommandCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int
433445
const char *indexname = RedisModule_StringPtrLen(argv[1], NULL);
434446
AREQ *r = AREQ_New();
435447
QueryError status = {0};
448+
449+
#ifdef RS_COORDINATOR
450+
// If we got here, we know `argv[0]` is a valid registered command name.
451+
// If it starts with an underscore, it is an internal command.
452+
if (RedisModule_StringPtrLen(argv[0], NULL)[0] == '_') {
453+
r->reqflags |= QEXEC_F_INTERNAL;
454+
}
455+
#endif
456+
436457
if (parseProfile(r, withProfile, argv, argc, &status) != REDISMODULE_OK) {
437458
goto error;
438459
}
@@ -442,7 +463,7 @@ static int execCommandCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int
442463
}
443464

444465
SET_DIALECT(r->sctx->spec->used_dialects, r->dialectVersion);
445-
SET_DIALECT(RSGlobalConfig.used_dialects, r->dialectVersion);
466+
SET_DIALECT(RSGlobalStats.totalStats.used_dialects, r->dialectVersion);
446467

447468
if (r->reqflags & QEXEC_F_IS_CURSOR) {
448469
int rc = AREQ_StartCursor(r, ctx, r->sctx->spec->name, &status, false);
@@ -549,11 +570,6 @@ int AREQ_StartCursor(AREQ *r, RedisModuleCtx *outctx, const char *lookupName, Qu
549570
static void runCursor(RedisModuleCtx *outputCtx, Cursor *cursor, size_t num) {
550571
AREQ *req = cursor->execState;
551572

552-
// reset profile clock for cursor reads except for 1st
553-
if (IsProfile(req) && req->totalTime != 0) {
554-
hires_clock_get(&req->initClock);
555-
}
556-
557573
// update timeout for current cursor read
558574
if (req->qiter.rootProc->type != RP_NETWORK) {
559575
updateTimeout(&req->timeoutTime, req->reqTimeout);
@@ -620,6 +636,7 @@ static void cursorRead(RedisModuleCtx *ctx, uint64_t cid, size_t count) {
620636
AREQ *req = cursor->execState;
621637
req->qiter.err = &status;
622638
ConcurrentSearchCtx_ReopenKeys(&req->conc);
639+
req->reqflags &= ~QEXEC_F_IS_AGGREGATE; // Second read was not triggered by FT.AGGREGATE
623640
runCursor(ctx, cursor, count);
624641
}
625642

0 commit comments

Comments
 (0)