Skip to content

Commit 602fe76

Browse files
kei-nanGuyAv46oshadmi
authored
MOD-8009: Allow Users To Configure Cursor Index Limitation Through Global Config (#5137)
* * move cursor limitation per index to be a configuration parameter * solve a bug where the coordinator would stop reading from internal cursors * * remove int return type for iterate callback functions, since their return value is ignored and has no real meaning. * * Code Review - Round #1 * * Code Review - Round #1, missing file change * * fix crash in testAggregate.py * * Code Review - Round #2 * * replace comma with a period * * if we somehow ended up with a closed channel, free the reply * Update src/config.c Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> * Update src/cursor.c Co-authored-by: Omer Shadmi <76992134+oshadmi@users.noreply.github.com> * - fix channel test * - add logging for the broadcasting failure flow * - changed broadcast failure to be verbose instead of warning * Update tests/pytests/test_cursors.py Co-authored-by: Omer Shadmi <76992134+oshadmi@users.noreply.github.com> --------- Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> Co-authored-by: Omer Shadmi <76992134+oshadmi@users.noreply.github.com>
1 parent ae92aa4 commit 602fe76

17 files changed

Lines changed: 126 additions & 58 deletions

File tree

src/config.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,16 @@ CONFIG_GETTER(getBGIndexSleepGap) {
636636
CONFIG_BOOLEAN_SETTER(set_PrioritizeIntersectUnionChildren, prioritizeIntersectUnionChildren)
637637
CONFIG_BOOLEAN_GETTER(get_PrioritizeIntersectUnionChildren, prioritizeIntersectUnionChildren, 0)
638638

639+
CONFIG_SETTER(setIndexCursorLimit) {
640+
int acrc = AC_GetLongLong(ac, &config->indexCursorLimit, AC_F_GE0);
641+
RETURN_STATUS(acrc);
642+
}
643+
644+
CONFIG_GETTER(getIndexCursorLimit) {
645+
sds ss = sdsempty();
646+
return sdscatprintf(ss, "%lld", config->indexCursorLimit);
647+
}
648+
639649
RSConfig RSGlobalConfig = RS_DEFAULT_CONFIG;
640650

641651
static RSConfigVar *findConfigVar(const RSConfigOptions *config, const char *name) {
@@ -840,6 +850,10 @@ RSConfigOptions RSGlobalConfigOptions = {
840850
"high memory consumption.",
841851
.setValue = setCursorMaxIdle,
842852
.getValue = getCursorMaxIdle},
853+
{.name = "INDEX_CURSOR_LIMIT",
854+
.helpText = "Max number of cursors for a given index that can be opened inside of a shard. Default is 128",
855+
.setValue = setIndexCursorLimit,
856+
.getValue = getIndexCursorLimit},
843857
{.name = "NO_MEM_POOLS",
844858
.helpText = "Set RediSearch to run without memory pools",
845859
.setValue = setNoMemPools,

src/config.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ typedef struct {
139139
// If set, we use an optimization that sorts the children of an intersection iterator in a way
140140
// where union iterators are being factorize by the number of their own children.
141141
int prioritizeIntersectUnionChildren;
142+
// Limit the number of cursors that can be created for a single index
143+
long long indexCursorLimit;
142144
} RSConfig;
143145

144146
typedef enum {
@@ -217,6 +219,7 @@ void UpgradeDeprecatedMTConfigs();
217219
#define GC_SCANSIZE 100
218220
#define DEFAULT_MIN_PHONETIC_TERM_LEN 3
219221
#define DEFAULT_FORK_GC_RUN_INTERVAL 30
222+
#define DEFAULT_INDEX_CURSOR_LIMIT 128
220223
#define SEARCH_REQUEST_RESULTS_MAX 1000000
221224
#define NR_MAX_DEPTH_BALANCE 2
222225
#define VECSIM_DEFAULT_BLOCK_SIZE 1024
@@ -262,7 +265,8 @@ void UpgradeDeprecatedMTConfigs();
262265
.vssMaxResize = 0, \
263266
.multiTextOffsetDelta = 100, \
264267
.numBGIndexingIterationsBeforeSleep = 100, \
265-
.prioritizeIntersectUnionChildren = false \
268+
.prioritizeIntersectUnionChildren = false, \
269+
.indexCursorLimit = DEFAULT_INDEX_CURSOR_LIMIT \
266270
}
267271

268272
#define REDIS_ARRAY_LIMIT 7

src/coord/dist_aggregate.c

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,13 @@
1616
#include "resp3.h"
1717
#include "coord/config.h"
1818
#include "dist_profile.h"
19+
#include "util/misc.h"
1920

2021
#include <err.h>
2122

2223
// Get cursor command using a cursor id and an existing aggregate command
2324
// Returns true if the cursor is not done (i.e., not depleted)
24-
static bool getCursorCommand(MRReply *res, MRCommand *cmd, MRIteratorCtx *ctx) {
25-
long long cursorId;
26-
if (!MRReply_ToInteger(MRReply_ArrayElement(res, 1), &cursorId)) {
27-
// Invalid format?!
28-
return false;
29-
}
30-
25+
static bool getCursorCommand(long long cursorId, MRCommand *cmd, MRIteratorCtx *ctx) {
3126
if (cursorId == 0) {
3227
// Cursor was set to 0, end of reply chain. cmd->depleted will be set in `MRIteratorCallback_Done`.
3328
return false;
@@ -73,7 +68,7 @@ static bool getCursorCommand(MRReply *res, MRCommand *cmd, MRIteratorCtx *ctx) {
7368
}
7469

7570

76-
static int netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
71+
static void netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
7772
MRCommand *cmd = MRIteratorCallback_GetCommand(ctx);
7873

7974
// If the root command of this reply is a DEL command, we don't want to
@@ -85,23 +80,25 @@ static int netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
8580
// Discard the response, and return REDIS_OK
8681
MRIteratorCallback_Done(ctx, MRReply_Type(rep) == MR_REPLY_ERROR);
8782
MRReply_Free(rep);
88-
return REDIS_OK;
83+
return;
8984
}
9085

9186
// Check if an error returned from the shard
9287
if (MRReply_Type(rep) == MR_REPLY_ERROR) {
93-
RedisModule_Log(RSDummyContext, "notice", "Coordinator got an error from a shard");
94-
RedisModule_Log(RSDummyContext, "verbose", "Shard error: %s", MRReply_String(rep, NULL));
88+
const char* error = MRReply_String(rep, NULL);
89+
RedisModule_Log(RSDummyContext, "notice", "Coordinator got an error '%.*s' from a shard", GetRedisErrorCodeLength(error), error);
90+
RedisModule_Log(RSDummyContext, "verbose", "Shard error: %s", error);
9591
MRIteratorCallback_AddReply(ctx, rep); // to be picked up by getNextReply
9692
MRIteratorCallback_Done(ctx, 1);
97-
return REDIS_ERR;
93+
return;
9894
}
9995

96+
const bool isResp3 = cmd->protocol == 3;
10097
bool bail_out = MRReply_Type(rep) != MR_REPLY_ARRAY;
10198

10299
if (!bail_out) {
103100
size_t len = MRReply_Length(rep);
104-
if (cmd->protocol == 3) {
101+
if (isResp3) {
105102
bail_out = len != 2; // (map, cursor)
106103
if (bail_out) {
107104
RedisModule_Log(RSDummyContext, "warning", "Expected reply of length 2, got %ld", len);
@@ -118,15 +115,17 @@ static int netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
118115
RedisModule_Log(RSDummyContext, "warning", "An unexpected reply was received from a shard");
119116
MRReply_Free(rep);
120117
MRIteratorCallback_Done(ctx, 1);
121-
return REDIS_ERR;
118+
return;
122119
}
123120

124-
// rewrite and resend the cursor command if needed
125-
int rc = REDIS_OK;
126-
bool done = !getCursorCommand(rep, cmd, MRIteratorCallback_GetCtx(ctx));
121+
long long cursorId;
122+
MRReply* cursor = MRReply_ArrayElement(rep, 1);
123+
if (!MRReply_ToInteger(cursor, &cursorId)) {
124+
cursorId = 0;
125+
}
127126

128127
// Push the reply down the chain
129-
if (cmd->protocol == 3) // RESP3
128+
if (isResp3) // RESP3
130129
{
131130
MRReply *map = MRReply_ArrayElement(rep, 0);
132131
MRReply *results = NULL;
@@ -137,11 +136,7 @@ static int netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
137136
MRIteratorCallback_AddReply(ctx, rep); // to be picked up by getNextReply
138137
// User code now owns the reply, so we can't free it here ourselves!
139138
rep = NULL;
140-
} else {
141-
done = true;
142139
}
143-
} else {
144-
done = true;
145140
}
146141
}
147142
else // RESP2
@@ -151,28 +146,23 @@ static int netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
151146
MRIteratorCallback_AddReply(ctx, rep); // to be picked up by getNextReply
152147
// User code now owns the reply, so we can't free it here ourselves!
153148
rep = NULL;
154-
} else {
155-
done = true;
156149
}
157150
}
158151

159-
if (done) {
152+
// rewrite and resend the cursor command if needed
153+
// should only be determined based on the cursor and not on the set of results we get
154+
if (!getCursorCommand(cursorId, cmd, MRIteratorCallback_GetCtx(ctx))) {
160155
MRIteratorCallback_Done(ctx, 0);
161156
} else if (cmd->forCursor) {
162157
MRIteratorCallback_ProcessDone(ctx);
163-
} else {
164-
// resend command
165-
if (MRIteratorCallback_ResendCommand(ctx) == REDIS_ERR) {
166-
MRIteratorCallback_Done(ctx, 1);
167-
rc = REDIS_ERR;
168-
}
158+
} else if (MRIteratorCallback_ResendCommand(ctx) == REDIS_ERR) {
159+
MRIteratorCallback_Done(ctx, 1);
169160
}
170161

171162
if (rep != NULL) {
172163
// If rep has been set to NULL, it means the callback has been invoked
173164
MRReply_Free(rep);
174165
}
175-
return rc;
176166
}
177167

178168
RSValue *MRReply_ToValue(MRReply *r) {

src/coord/rmr/chan.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ size_t MRChannel_Size(MRChannel *chan) {
7373
return ret;
7474
}
7575

76-
int MRChannel_Push(MRChannel *chan, void *ptr) {
76+
PushErrorMask MRChannel_Push(MRChannel *chan, void *ptr) {
7777

7878
pthread_mutex_lock(&chan->lock);
79-
int rc = 1;
79+
int rc = 0;
8080
if (!chan->open) {
81-
rc = 0;
81+
rc = CHANNEL_CLOSED;
8282
goto end;
8383
}
8484

@@ -95,7 +95,7 @@ int MRChannel_Push(MRChannel *chan, void *ptr) {
9595
}
9696
chan->size++;
9797
end:
98-
if (pthread_cond_broadcast(&chan->cond)) rc = 0;
98+
if (pthread_cond_broadcast(&chan->cond)) rc |= BROADCAST_FAILURE;
9999
pthread_mutex_unlock(&chan->lock);
100100
return rc;
101101
}

src/coord/rmr/chan.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@ typedef struct MRChannel MRChannel;
1515

1616
extern void *MRCHANNEL_CLOSED;
1717

18+
typedef enum {
19+
CHANNEL_CLOSED = 0x1,
20+
BROADCAST_FAILURE = 0x2
21+
} PushErrorMask;
22+
23+
1824
MRChannel *MR_NewChannel();
19-
int MRChannel_Push(MRChannel *chan, void *ptr);
25+
PushErrorMask MRChannel_Push(MRChannel *chan, void *ptr);
2026
/* Pop an item, wait indefinitely or until the channel is closed for an item.
2127
* Return MRCHANNEL_CLOSED if the channel is closed*/
2228
void *MRChannel_Pop(MRChannel *chan);

src/coord/rmr/rmr.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,15 @@ MRIteratorCtx *MRIteratorCallback_GetCtx(MRIteratorCallbackCtx *ctx) {
529529
}
530530

531531
void MRIteratorCallback_AddReply(MRIteratorCallbackCtx *ctx, MRReply *rep) {
532-
MRChannel_Push(ctx->ic->chan, rep);
532+
PushErrorMask mask = MRChannel_Push(ctx->ic->chan, rep);
533+
if (mask & CHANNEL_CLOSED) {
534+
// We are using a verbose log level to prevent flodding the logs with this message in production
535+
RedisModule_Log(RSDummyContext, "verbose", "Tried pushing a reply to a closed channel");
536+
MRReply_Free(rep);
537+
}
538+
if (mask & BROADCAST_FAILURE) {
539+
RedisModule_Log(RSDummyContext, "verbose", "Failed broadcasting the reply in the channel");
540+
}
533541
}
534542

535543
void iterStartCb(void *p) {

src/coord/rmr/rmr.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ typedef struct MRIteratorCallbackCtx MRIteratorCallbackCtx;
7272
typedef struct MRIteratorCtx MRIteratorCtx;
7373
typedef struct MRIterator MRIterator;
7474

75-
typedef int (*MRIteratorCallback)(MRIteratorCallbackCtx *ctx, MRReply *rep);
75+
typedef void (*MRIteratorCallback)(MRIteratorCallbackCtx *ctx, MRReply *rep);
7676

7777
// Trigger all the commands in the iterator to be sent.
7878
// Returns true if there may be more replies to come, false if we are done.

src/cursor.c

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,16 +206,14 @@ Cursor *Cursors_Reserve(CursorList *cl, StrongRef global_spec_ref, unsigned inte
206206
// If the cursor should be associated with a spec,
207207
// we assume that global_spec_ref points to a valid spec, else the function returns NULL.
208208
IndexSpec *spec = StrongRef_Get(global_spec_ref);
209-
210209
// If we are in a coordinator ctx, the spec is NULL
211-
if (spec && (spec->activeCursors >= spec->cursorsCap)) {
210+
if (spec && spec->activeCursors >= RSGlobalConfig.indexCursorLimit) {
212211
/** Collect idle cursors now */
213212
Cursors_GCInternal(cl, 0);
214-
if (spec->activeCursors >= spec->cursorsCap) {
215-
QueryError_SetError(status, QUERY_ELIMIT, "Too many cursors allocated for index");
213+
if (spec->activeCursors >= RSGlobalConfig.indexCursorLimit) {
214+
QueryError_SetErrorFmt(status, QUERY_ELIMIT, "INDEX_CURSOR_LIMIT of %lld has been reached for an index", RSGlobalConfig.indexCursorLimit);
216215
goto done;
217216
}
218-
219217
}
220218

221219
cur = rm_calloc(1, sizeof(*cur));
@@ -313,7 +311,7 @@ void Cursors_RenderStats(CursorList *cl, CursorList *cl_coord, IndexSpec *spec,
313311
RedisModule_ReplyKV_LongLong(reply, "global_idle", ARRAY_GETSIZE_AS(&cl->idle, Cursor **) +
314312
ARRAY_GETSIZE_AS(&cl_coord->idle, Cursor **));
315313
RedisModule_ReplyKV_LongLong(reply, "global_total", kh_size(cl->lookup) + kh_size(cl_coord->lookup));
316-
RedisModule_ReplyKV_LongLong(reply, "index_capacity", spec->cursorsCap);
314+
RedisModule_ReplyKV_LongLong(reply, "index_capacity", RSGlobalConfig.indexCursorLimit);
317315
RedisModule_ReplyKV_LongLong(reply, "index_total", spec->activeCursors);
318316

319317
RedisModule_Reply_MapEnd(reply);
@@ -330,7 +328,7 @@ void Cursors_RenderStatsForInfo(CursorList *cl, CursorList *cl_coord, IndexSpec
330328
RedisModule_InfoAddFieldLongLong(ctx, "global_idle", ARRAY_GETSIZE_AS(&cl->idle, Cursor **) +
331329
ARRAY_GETSIZE_AS(&cl_coord->idle, Cursor **));
332330
RedisModule_InfoAddFieldLongLong(ctx, "global_total", kh_size(cl->lookup) + kh_size(cl_coord->lookup));
333-
RedisModule_InfoAddFieldLongLong(ctx, "index_capacity", spec->cursorsCap);
331+
RedisModule_InfoAddFieldLongLong(ctx, "index_capacity", RSGlobalConfig.indexCursorLimit);
334332
RedisModule_InfoAddFieldLongLong(ctx, "index_total", spec->activeCursors);
335333
RedisModule_InfoEndDictField(ctx);
336334

src/cursor.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ void CursorList_Empty(CursorList *cl);
136136
*/
137137
void CursorList_Expire(CursorList *cl);
138138

139-
#define RSCURSORS_DEFAULT_CAPACITY 128
140139
#define RSCURSORS_SWEEP_INTERVAL 500 /* GC Every 500 requests */
141140
#define RSCURSORS_SWEEP_THROTTLE (1 * (1000000000)) /* Throttle, in NS */
142141

src/spec.c

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,8 @@ static void setMemoryInfo(RedisModuleCtx *ctx) {
8686
* Initialize the spec's fields that are related to the cursors.
8787
*/
8888

89-
static void Cursors_initSpec(IndexSpec *spec, size_t capacity) {
89+
static void Cursors_initSpec(IndexSpec *spec) {
9090
spec->activeCursors = 0;
91-
spec->cursorsCap = capacity;
9291
}
9392

9493
/*
@@ -405,7 +404,7 @@ IndexSpec *IndexSpec_CreateNew(RedisModuleCtx *ctx, RedisModuleString **argv, in
405404
// Start the garbage collector
406405
IndexSpec_StartGC(ctx, spec_ref, sp);
407406

408-
Cursors_initSpec(sp, RSCURSORS_DEFAULT_CAPACITY);
407+
Cursors_initSpec(sp);
409408

410409
// Create the indexer
411410
sp->indexer = NewIndexer(sp);
@@ -2568,7 +2567,7 @@ int IndexSpec_CreateFromRdb(RedisModuleCtx *ctx, RedisModuleIO *rdb, int encver,
25682567
sp->uniqueId = spec_unique_ids++;
25692568

25702569
IndexSpec_StartGC(ctx, spec_ref, sp);
2571-
Cursors_initSpec(sp, RSCURSORS_DEFAULT_CAPACITY);
2570+
Cursors_initSpec(sp);
25722571

25732572
if (sp->flags & Index_HasSmap) {
25742573
sp->smap = SynonymMap_RdbLoad(rdb, encver);
@@ -2724,7 +2723,7 @@ void *IndexSpec_LegacyRdbLoad(RedisModuleIO *rdb, int encver) {
27242723

27252724
// start the gc and add the spec to the cursor list
27262725
IndexSpec_StartGC(RSDummyContext, spec_ref, sp);
2727-
Cursors_initSpec(sp, RSCURSORS_DEFAULT_CAPACITY);
2726+
Cursors_initSpec(sp);
27282727

27292728
dictAdd(legacySpecDict, sp->name, spec_ref.rm);
27302729
return spec_ref.rm;

0 commit comments

Comments
 (0)