Skip to content

Commit 9ccdf3e

Browse files
authored
[MOD-12418] Track OOM errors and warnings in info (#7452)
* Track timeout in sendchunk resp2 * Track timeout warning in sendSearchResults * Track timeout error in searchResultReducer (cherry picked from commit fafe1dc) * Track timeout in sendChunk_hybrid (cherry picked from commit 8529cb9) * test timeout metrics (cherry picked from commit c56a0d9) * fix isCoord check * Add query warning code and add function and fields needed to track (cherry picked from commit a414641) * Track timeout in sendchunk resp3 (cherry picked from commit 6853bb9) * readd skip * Update syntax and args error to new SA as cluster * format and enrico comment * Track OOM (cherry picked from commit de1a285aac27c73d4feca50abe3c2328f6959ce2) * fix warnings double counting * fix missing skip and logic * Change test to N=0 with Internal only (not working so revert afterwards) * Revert "Change test to N=0 with Internal only (not working so revert afterwards)" This reverts commit 829ac53. * meirav comments * Stablize tests * Add resp3 test * _disable_ hybrid sa timeout * Make test robust * fixup! Make test robust * remove limits * comments * Refactor warning tracking loop for clarity * Add test for warnings metric count with timeout * fix flaky
1 parent 7bc0ece commit 9ccdf3e

9 files changed

Lines changed: 316 additions & 7 deletions

File tree

src/aggregate/aggregate_exec.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,9 @@ static void sendChunk_Resp2(AREQ *req, RedisModule_Reply *reply, size_t limit,
474474
// Assuming that if we reached here, timeout is not an error.
475475
QueryWarningsGlobalStats_UpdateWarning(QUERY_WARNING_CODE_TIMED_OUT, 1, !IsInternal(req));
476476
}
477+
if (QueryError_HasQueryOOMWarning(qctx->err)) {
478+
QueryWarningsGlobalStats_UpdateWarning(QUERY_WARNING_CODE_OUT_OF_MEMORY_COORD, 1, !IsInternal(req));
479+
}
477480

478481
// Prepare profile printer context
479482
RedisSearchCtx *sctx = AREQ_SearchCtx(req);
@@ -606,6 +609,7 @@ static void sendChunk_Resp3(AREQ *req, RedisModule_Reply *reply, size_t limit,
606609
RedisModule_Reply_SimpleString(reply, QUERY_WINDEXING_FAILURE);
607610
}
608611
if (QueryError_HasQueryOOMWarning(qctx->err)) {
612+
QueryWarningsGlobalStats_UpdateWarning(QUERY_WARNING_CODE_OUT_OF_MEMORY_COORD, 1, !IsInternal(req));
609613
// We use the cluster warning since shard level warning sent via empty reply bailout
610614
RedisModule_Reply_SimpleString(reply, QUERY_WOOM_COORD);
611615
}
@@ -748,6 +752,7 @@ static ProfilePrinterCtx createProfilePrinterCtx(AREQ *req) {
748752
// warning
749753
RedisModule_ReplyKV_Array(reply, "warning"); // >warnings
750754
if (QueryError_HasQueryOOMWarning(AREQ_QueryProcessingCtx(req)->err)) {
755+
QueryWarningsGlobalStats_UpdateWarning(QUERY_WARNING_CODE_OUT_OF_MEMORY_SHARD, 1, !IsInternal(req));
751756
// Shards should use SHARD warning
752757
// SA and Coordinator should use COORD warning
753758
const char *warning = !IsInternal(req) ? QUERY_WOOM_COORD : QUERY_WOOM_SHARD;
@@ -788,6 +793,10 @@ static ProfilePrinterCtx createProfilePrinterCtx(AREQ *req) {
788793

789794
ProfilePrinterCtx profileCtx = createProfilePrinterCtx(req);
790795

796+
if (QueryError_HasQueryOOMWarning(AREQ_QueryProcessingCtx(req)->err)) {
797+
QueryWarningsGlobalStats_UpdateWarning(QUERY_WARNING_CODE_OUT_OF_MEMORY_SHARD, 1, !IsInternal(req));
798+
}
799+
791800
if (AREQ_RequestFlags(req) & QEXEC_F_IS_CURSOR) {
792801
// Cursor done
793802
RedisModule_Reply_LongLong(reply, 0);
@@ -1107,6 +1116,7 @@ static int execCommandCommon(RedisModuleCtx *ctx, RedisModuleString **argv, int
11071116
// Memory guardrail
11081117
if (QueryMemoryGuard(ctx)) {
11091118
if (RSGlobalConfig.requestConfigParams.oomPolicy == OomPolicy_Fail) {
1119+
QueryErrorsGlobalStats_UpdateError(QUERY_ERROR_CODE_OUT_OF_MEMORY, 1, GetNumShards_UnSafe() == 1);
11101120
return QueryMemoryGuardFailure_WithReply(ctx);
11111121
}
11121122
// Assuming OOM policy is return since we didn't ignore the memory guardrail

src/aggregate/reply_empty.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "../hybrid/hybrid_exec.h"
1717
#include "../rmutil/util.h"
1818
#include "reply_empty.h"
19+
#include "info/global_stats.h"
1920

2021
// Helper function that performs minimal parsing of query arguments to support sendChunk output
2122
static int shallow_parse_query_args(RedisModuleString **argv, int argc, AREQ *req) {
@@ -125,6 +126,7 @@ int common_hybrid_query_reply_empty(RedisModuleCtx *ctx, QueryErrorCode errCode,
125126
RedisModule_ReplyKV_LongLong(coordInfoReply, "VSIM", 0);
126127
RedisModule_ReplyKV_Array(coordInfoReply,"warnings"); // warnings []
127128
if (QueryError_HasQueryOOMWarning(&status)) {
129+
QueryWarningsGlobalStats_UpdateWarning(QUERY_WARNING_CODE_OUT_OF_MEMORY_SHARD, 1, SHARD_ERR_WARN);
128130
RedisModule_Reply_SimpleString(coordInfoReply, QueryError_Strerror(QUERY_ERROR_CODE_OUT_OF_MEMORY));
129131
}
130132
RedisModule_Reply_ArrayEnd(coordInfoReply); // ~warnings

src/coord/hybrid/hybrid_cursor_mappings.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "../../../deps/rmutil/rm_assert.h"
1414
#include "query_error.h"
1515
#include <string.h>
16+
#include "info/global_stats.h"
1617

1718
#define INTERNAL_HYBRID_RESP3_LENGTH 6
1819
#define INTERNAL_HYBRID_RESP2_LENGTH 6
@@ -220,14 +221,13 @@ bool ProcessHybridCursorMappings(const MRCommand *cmd, int numShards, StrongRef
220221
if (QueryError_GetCode(&ctx->errors[i]) == QUERY_ERROR_CODE_OUT_OF_MEMORY && oomPolicy == OomPolicy_Return ) {
221222
QueryError_SetQueryOOMWarning(status);
222223
} else {
223-
QueryError_SetWithoutUserDataFmt(status, QUERY_ERROR_CODE_GENERIC, "Failed to process shard responses, first error: %s, total error count: %zu",
224+
QueryError_SetWithoutUserDataFmt(status, QueryError_GetCode(&ctx->errors[i]), "Failed to process shard responses, first error: %s, total error count: %zu",
224225
QueryError_GetUserError(&ctx->errors[i]), array_len(ctx->errors));
225226
success = false;
226227
break;
227228
}
228229
}
229230
}
230-
231231
// Cleanup
232232
MRIterator_Release(it);
233233
cleanupCtx(ctx);

src/hybrid/hybrid_exec.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ void sendChunk_hybrid(HybridRequest *hreq, RedisModule_Reply *reply, size_t limi
287287
RedisModule_Reply_SimpleString(reply, QUERY_WINDEXING_FAILURE);
288288
}
289289
if (QueryError_HasQueryOOMWarning(qctx->err)) {
290+
QueryWarningsGlobalStats_UpdateWarning(QUERY_WARNING_CODE_OUT_OF_MEMORY_COORD, 1, COORD_ERR_WARN);
290291
// Cluster mode only: handled directly here instead of through handleAndReplyWarning()
291292
// because this warning is not related to subqueries or post-processing terminology
292293
RedisModule_Reply_SimpleString(reply, QUERY_WOOM_COORD);
@@ -325,6 +326,7 @@ void sendChunk_ReplyOnly_HybridEmptyResults(RedisModule_Reply *reply, QueryError
325326
// warning
326327
RedisModule_Reply_SimpleString(reply, "warnings");
327328
if (QueryError_HasQueryOOMWarning(err)) {
329+
QueryWarningsGlobalStats_UpdateWarning(QUERY_WARNING_CODE_OUT_OF_MEMORY_COORD, 1, COORD_ERR_WARN);
328330
RedisModule_Reply_Array(reply);
329331
// This function is called by Coordinator or SA
330332
RedisModule_Reply_SimpleString(reply, QUERY_WOOM_COORD);
@@ -590,6 +592,7 @@ int hybridCommandHandler(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
590592
// Memory guardrail
591593
if (QueryMemoryGuard(ctx)) {
592594
if (RSGlobalConfig.requestConfigParams.oomPolicy == OomPolicy_Fail) {
595+
QueryErrorsGlobalStats_UpdateError(QUERY_ERROR_CODE_OUT_OF_MEMORY, 1, !internal);
593596
return QueryMemoryGuardFailure_WithReply(ctx);
594597
}
595598
// Assuming OOM policy is return since we didn't ignore the memory guardrail

src/info/global_stats.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,13 @@ QueriesGlobalStats TotalGlobalStats_GetQueryStats() {
9898
stats.coord_errors.syntax = READ(RSGlobalStats.totalStats.queries.coord_errors.syntax);
9999
stats.coord_errors.arguments = READ(RSGlobalStats.totalStats.queries.coord_errors.arguments);
100100
stats.coord_errors.timeout = READ(RSGlobalStats.totalStats.queries.coord_errors.timeout);
101+
stats.shard_errors.oom = READ(RSGlobalStats.totalStats.queries.shard_errors.oom);
102+
stats.coord_errors.oom = READ(RSGlobalStats.totalStats.queries.coord_errors.oom);
101103
// Warnings
102104
stats.shard_warnings.timeout = READ(RSGlobalStats.totalStats.queries.shard_warnings.timeout);
103105
stats.coord_warnings.timeout = READ(RSGlobalStats.totalStats.queries.coord_warnings.timeout);
106+
stats.shard_warnings.oom = READ(RSGlobalStats.totalStats.queries.shard_warnings.oom);
107+
stats.coord_warnings.oom = READ(RSGlobalStats.totalStats.queries.coord_warnings.oom);
104108
return stats;
105109
}
106110

@@ -130,6 +134,9 @@ void QueryErrorsGlobalStats_UpdateError(QueryErrorCode code, int toAdd, bool coo
130134
case QUERY_ERROR_CODE_TIMED_OUT:
131135
INCR_BY(queries_errors->timeout, toAdd);
132136
break;
137+
case QUERY_ERROR_CODE_OUT_OF_MEMORY:
138+
INCR_BY(queries_errors->oom, toAdd);
139+
break;
133140
}
134141
}
135142

@@ -145,6 +152,12 @@ void QueryWarningsGlobalStats_UpdateWarning(QueryWarningCode code, int toAdd, bo
145152
case QUERY_WARNING_CODE_TIMED_OUT:
146153
INCR_BY(queries_warnings->timeout, toAdd);
147154
break;
155+
case QUERY_WARNING_CODE_OUT_OF_MEMORY_SHARD:
156+
INCR_BY(queries_warnings->oom, toAdd);
157+
break;
158+
case QUERY_WARNING_CODE_OUT_OF_MEMORY_COORD:
159+
INCR_BY(queries_warnings->oom, toAdd);
160+
break;
148161
}
149162
}
150163

src/info/global_stats.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,12 @@ typedef struct {
5050
size_t syntax; // Number of syntax errors
5151
size_t arguments; // Number of parse arguments errors
5252
size_t timeout; // Number of timeout errors
53+
size_t oom; // Number of OOM errors
5354
} QueryErrorsGlobalStats;
5455

5556
typedef struct {
5657
size_t timeout;
58+
size_t oom;
5759
} QueryWarningGlobalStats;
5860

5961
typedef struct {

src/info/info_redis/info_redis.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,17 @@ void AddToInfo_ErrorsAndWarnings(RedisModuleInfoCtx *ctx, TotalIndexesInfo *tota
259259
RedisModule_InfoAddFieldULongLong(ctx, "shard_total_query_errors_arguments", stats.shard_errors.arguments);
260260
RedisModule_InfoAddFieldULongLong(ctx, "shard_total_query_errors_timeout", stats.shard_errors.timeout);
261261
RedisModule_InfoAddFieldULongLong(ctx, "shard_total_query_warnings_timeout", stats.shard_warnings.timeout);
262+
RedisModule_InfoAddFieldULongLong(ctx, "shard_total_query_errors_oom", stats.shard_errors.oom);
263+
RedisModule_InfoAddFieldULongLong(ctx, "shard_total_query_warnings_oom", stats.shard_warnings.oom);
264+
262265
// Coordinator errors and warnings
263266
RedisModule_InfoAddSection(ctx, "coordinator_warnings_and_errors");
264267
RedisModule_InfoAddFieldULongLong(ctx, "coord_total_query_errors_syntax", stats.coord_errors.syntax);
265268
RedisModule_InfoAddFieldULongLong(ctx, "coord_total_query_errors_arguments", stats.coord_errors.arguments);
266269
RedisModule_InfoAddFieldULongLong(ctx, "coord_total_query_errors_timeout", stats.coord_errors.timeout);
267270
RedisModule_InfoAddFieldULongLong(ctx, "coord_total_query_warnings_timeout", stats.coord_warnings.timeout);
271+
RedisModule_InfoAddFieldULongLong(ctx, "coord_total_query_errors_oom", stats.coord_errors.oom);
272+
RedisModule_InfoAddFieldULongLong(ctx, "coord_total_query_warnings_oom", stats.coord_warnings.oom);
268273
}
269274

270275
void AddToInfo_MultiThreading(RedisModuleInfoCtx *ctx, TotalIndexesInfo *total_info) {

src/module.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2712,6 +2712,7 @@ static void sendSearchResults(RedisModule_Reply *reply, searchReducerCtx *rCtx)
27122712
}
27132713
RedisModule_Reply_ArrayEnd(reply);
27142714
} else if (req->queryOOM) {
2715+
QueryWarningsGlobalStats_UpdateWarning(QUERY_WARNING_CODE_OUT_OF_MEMORY_COORD, 1, COORD_ERR_WARN);
27152716
// We use the cluster warning since shard level warning sent via empty reply bailout
27162717
RedisModule_Reply_SimpleString(reply, QUERY_WOOM_COORD);
27172718
} else {
@@ -2805,6 +2806,10 @@ static void sendSearchResults(RedisModule_Reply *reply, searchReducerCtx *rCtx)
28052806
}
28062807
}
28072808
RedisModule_Reply_MapEnd(reply);
2809+
2810+
if (req->queryOOM) {
2811+
QueryWarningsGlobalStats_UpdateWarning(QUERY_WARNING_CODE_OUT_OF_MEMORY_COORD, 1, COORD_ERR_WARN);
2812+
}
28082813
//-------------------------------------------------------------------------------------------
28092814

28102815
// Free the sorted results
@@ -3263,6 +3268,7 @@ int DistAggregateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
32633268
if (QueryMemoryGuard(ctx)) {
32643269
// If we are in a single shard cluster, we should fail the query if we are out of memory
32653270
if (RSGlobalConfig.requestConfigParams.oomPolicy == OomPolicy_Fail) {
3271+
QueryErrorsGlobalStats_UpdateError(QUERY_ERROR_CODE_OUT_OF_MEMORY, 1, COORD_ERR_WARN);
32663272
return QueryMemoryGuardFailure_WithReply(ctx);
32673273
}
32683274
// Assuming OOM policy is return since we didn't ignore the memory guardrail
@@ -3329,6 +3335,7 @@ int DistHybridCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
33293335
if (QueryMemoryGuard(ctx)) {
33303336
// If we are in a single shard cluster, we should fail the query if we are out of memory
33313337
if (RSGlobalConfig.requestConfigParams.oomPolicy == OomPolicy_Fail) {
3338+
QueryErrorsGlobalStats_UpdateError(QUERY_ERROR_CODE_OUT_OF_MEMORY, 1, COORD_ERR_WARN);
33323339
return QueryMemoryGuardFailure_WithReply(ctx);
33333340
}
33343341
// Assuming OOM policy is return since we didn't ignore the memory guardrail
@@ -3671,6 +3678,7 @@ int DistSearchCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
36713678
// Memory guardrail
36723679
if (QueryMemoryGuard(ctx)) {
36733680
if (RSGlobalConfig.requestConfigParams.oomPolicy == OomPolicy_Fail) {
3681+
QueryErrorsGlobalStats_UpdateError(QUERY_ERROR_CODE_OUT_OF_MEMORY, 1, COORD_ERR_WARN);
36743682
return QueryMemoryGuardFailure_WithReply(ctx);
36753683
}
36763684
// Assuming policy is return, since we didn't ignore the memory guardrail

0 commit comments

Comments
 (0)