Skip to content

Commit b86fcda

Browse files
authored
[2.6] MOD-8009: Allow Users To Configure Cursor Index Limitation Through Global Config (#5168)
* MOD-8009: Allow Users To Configure Cursor Index Limitation Through Global Config (#5137) * * move cursor limitation per index to be a configuration parameter * solve a bug where the coordinator would stop reading from internal cursors * * remove int return type for iterate callback functions, since their return value is ignored and has no real meaning. * * Code Review - Round #1 * * Code Review - Round #1, missing file change * * fix crash in testAggregate.py * * Code Review - Round #2 * * replace comma with a period * * if we somehow ended up with a closed channel, free the reply * Update src/config.c Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> * Update src/cursor.c Co-authored-by: Omer Shadmi <76992134+oshadmi@users.noreply.github.com> * - fix channel test * - add logging for the broadcasting failure flow * - changed broadcast failure to be verbose instead of warning * Update tests/pytests/test_cursors.py Co-authored-by: Omer Shadmi <76992134+oshadmi@users.noreply.github.com> --------- Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com> Co-authored-by: Omer Shadmi <76992134+oshadmi@users.noreply.github.com> (cherry picked from commit 602fe76) * * use RSGlobalConfig.indexCursorLimit * * apply missing cherry-pick changes * * fix netCursorCallback callback return type * * code review comments + test crash fix * * fix remaining code review comments
1 parent 31f821f commit b86fcda

16 files changed

Lines changed: 113 additions & 46 deletions

File tree

coord/src/dist_aggregate.c

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,7 @@
1818
#include <err.h>
1919

2020
/* Get cursor command using a cursor id and an existing aggregate command */
21-
static int getCursorCommand(MRReply *prev, MRCommand *cmd) {
22-
long long cursorId;
23-
if (!MRReply_ToInteger(MRReply_ArrayElement(prev, 1), &cursorId)) {
24-
// Invalid format?!
25-
return 0;
26-
}
21+
static int getCursorCommand(long long cursorId, MRCommand *cmd) {
2722
if (cursorId == 0) {
2823
// Cursor was set to 0, end of reply chain.
2924
cmd->depleted = true;
@@ -46,7 +41,7 @@ static int getCursorCommand(MRReply *prev, MRCommand *cmd) {
4641
return 1;
4742
}
4843

49-
static int netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep, MRCommand *cmd) {
44+
static void netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep, MRCommand *cmd) {
5045
// Should we assert this??
5146
if (!rep || MRReply_Type(rep) != MR_REPLY_ARRAY ||
5247
(MRReply_Length(rep) != 2 && MRReply_Length(rep) != 3)) {
@@ -56,40 +51,37 @@ static int netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep, MRCommand
5651
MRReply_Free(rep);
5752
MRIteratorCallback_Done(ctx, 1);
5853
RedisModule_Log(NULL, "warning", "An empty reply was received from a shard");
59-
return REDIS_ERR;
54+
return;
6055
}
6156

62-
// rewrite and resend the cursor command if needed
63-
int rc = REDIS_OK;
64-
int isDone = !getCursorCommand(rep, cmd);
65-
57+
long long cursorId;
58+
MRReply* cursor = MRReply_ArrayElement(rep, 1);
59+
if (!MRReply_ToInteger(cursor, &cursorId)) {
60+
cursorId = 0;
61+
}
6662
// Push the reply down the chain
6763
MRReply *arr = MRReply_ArrayElement(rep, 0);
6864
if (arr && MRReply_Type(arr) == MR_REPLY_ARRAY && MRReply_Length(arr) > 1) {
6965
MRIteratorCallback_AddReply(ctx, rep);
7066
// User code now owns the reply, so we can't free it here ourselves!
7167
rep = NULL;
72-
} else {
73-
isDone = 1;
7468
}
7569

76-
if (isDone) {
70+
if (!getCursorCommand(cursorId, cmd)) {
7771
MRIteratorCallback_Done(ctx, 0);
7872
} else if (cmd->forCursor) {
7973
MRIteratorCallback_ProcessDone(ctx);
8074
} else {
8175
// resend command
8276
if (REDIS_ERR == MRIteratorCallback_ResendCommand(ctx, cmd)) {
8377
MRIteratorCallback_Done(ctx, 1);
84-
rc = REDIS_ERR;
8578
}
8679
}
8780

8881
if (rep != NULL) {
8982
// If rep has been set to NULL, it means the callback has been invoked
9083
MRReply_Free(rep);
9184
}
92-
return rc;
9385
}
9486

9587
RSValue *MRReply_ToValue(MRReply *r) {

coord/src/module.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1988,7 +1988,7 @@ static void addIndexCursor(const IndexSpec *sp) {
19881988
char *end = strchr(s, '{');
19891989
if (end) {
19901990
*end = '\0';
1991-
CursorList_AddSpec(&RSCursorsCoord, s, RSCURSORS_DEFAULT_CAPACITY);
1991+
CursorList_AddSpec(&RSCursorsCoord, s);
19921992
}
19931993
rm_free(s);
19941994
}

coord/src/rmr/chan.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ size_t MRChannel_MaxSize(MRChannel *chan) {
8282
return ret;
8383
}
8484

85-
int MRChannel_Push(MRChannel *chan, void *ptr) {
85+
PushErrorMask MRChannel_Push(MRChannel *chan, void *ptr) {
8686

8787
pthread_mutex_lock(&chan->lock);
88-
int rc = 1;
88+
int rc = 0;
8989
if (!chan->open || (chan->maxSize > 0 && chan->size == chan->maxSize)) {
90-
rc = 0;
90+
rc = CHANNEL_CLOSED;
9191
goto end;
9292
}
9393

@@ -104,7 +104,7 @@ int MRChannel_Push(MRChannel *chan, void *ptr) {
104104
}
105105
chan->size++;
106106
end:
107-
if (pthread_cond_broadcast(&chan->cond)) rc = 0;
107+
if (pthread_cond_broadcast(&chan->cond)) rc |= BROADCAST_FAILURE;
108108
pthread_mutex_unlock(&chan->lock);
109109
return rc;
110110
}

coord/src/rmr/chan.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,14 @@ typedef struct MRChannel MRChannel;
1515

1616
extern void *MRCHANNEL_CLOSED;
1717

18+
typedef enum {
19+
CHANNEL_CLOSED = 0x1,
20+
BROADCAST_FAILURE = 0x2
21+
} PushErrorMask;
22+
23+
1824
MRChannel *MR_NewChannel(size_t max);
19-
int MRChannel_Push(MRChannel *chan, void *ptr);
25+
PushErrorMask MRChannel_Push(MRChannel *chan, void *ptr);
2026
/* Pop an item, wait indefinitely or until the channel is closed for an item.
2127
* Return MRCHANNEL_CLOSED if the channel is closed*/
2228
void *MRChannel_Pop(MRChannel *chan);

coord/src/rmr/rmr.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,8 +591,16 @@ int MRIteratorCallback_Done(MRIteratorCallbackCtx *ctx, int error) {
591591
return 1;
592592
}
593593

594-
int MRIteratorCallback_AddReply(MRIteratorCallbackCtx *ctx, MRReply *rep) {
595-
return MRChannel_Push(ctx->ic->chan, rep);
594+
void MRIteratorCallback_AddReply(MRIteratorCallbackCtx *ctx, MRReply *rep) {
595+
PushErrorMask mask = MRChannel_Push(ctx->ic->chan, rep);
596+
if (mask & CHANNEL_CLOSED) {
597+
// We are using a verbose log level to prevent flodding the logs with this message in production
598+
RedisModule_Log(RSDummyContext, "verbose", "Tried pushing a reply to a closed channel");
599+
MRReply_Free(rep);
600+
}
601+
if (mask & BROADCAST_FAILURE) {
602+
RedisModule_Log(RSDummyContext, "verbose", "Failed broadcasting the reply in the channel");
603+
}
596604
}
597605

598606
void iterStartCb(void *p) {

coord/src/rmr/rmr.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ extern void *MRITERATOR_DONE;
7373
typedef struct MRIteratorCallbackCtx MRIteratorCallbackCtx;
7474
typedef struct MRIterator MRIterator;
7575

76-
typedef int (*MRIteratorCallback)(MRIteratorCallbackCtx *ctx, MRReply *rep, MRCommand *cmd);
76+
typedef void (*MRIteratorCallback)(MRIteratorCallbackCtx *ctx, MRReply *rep, MRCommand *cmd);
7777

7878
// Trigger all the commands in the iterator to be sent.
7979
// Returns true if there may be more replies to come, false if we are done.
@@ -83,7 +83,7 @@ MRReply *MRIterator_Next(MRIterator *it);
8383

8484
MRIterator *MR_Iterate(MRCommandGenerator cg, MRIteratorCallback cb);
8585

86-
int MRIteratorCallback_AddReply(MRIteratorCallbackCtx *ctx, MRReply *rep);
86+
void MRIteratorCallback_AddReply(MRIteratorCallbackCtx *ctx, MRReply *rep);
8787

8888
int MRIteratorCallback_Done(MRIteratorCallbackCtx *ctx, int error);
8989

coord/src/rmr/test/test_chan.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ void testChan() {
1818
for (int i = 0; i < 100; i++) {
1919
int *ptr = rm_malloc(sizeof(*ptr));
2020
*ptr = i;
21-
int rc = MRChannel_Push(c, ptr);
22-
mu_assert_int_eq(1, rc);
21+
PushErrorMask mask = MRChannel_Push(c, ptr);
22+
mu_assert_int_eq(0, mask);
2323
mu_assert_int_eq(i + 1, MRChannel_Size(c));
2424
}
2525

src/config.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,16 @@ CONFIG_GETTER(getBGIndexSleepGap) {
539539
CONFIG_BOOLEAN_SETTER(set_PrioritizeIntersectUnionChildren, prioritizeIntersectUnionChildren)
540540
CONFIG_BOOLEAN_GETTER(get_PrioritizeIntersectUnionChildren, prioritizeIntersectUnionChildren, 0)
541541

542+
CONFIG_SETTER(setIndexCursorLimit) {
543+
int acrc = AC_GetLongLong(ac, &config->indexCursorLimit, AC_F_GE0);
544+
RETURN_STATUS(acrc);
545+
}
546+
547+
CONFIG_GETTER(getIndexCursorLimit) {
548+
sds ss = sdsempty();
549+
return sdscatprintf(ss, "%lld", config->indexCursorLimit);
550+
}
551+
542552
RSConfig RSGlobalConfig = RS_DEFAULT_CONFIG;
543553

544554
static RSConfigVar *findConfigVar(const RSConfigOptions *config, const char *name) {
@@ -723,6 +733,10 @@ RSConfigOptions RSGlobalConfigOptions = {
723733
"high memory consumption.",
724734
.setValue = setCursorMaxIdle,
725735
.getValue = getCursorMaxIdle},
736+
{.name = "INDEX_CURSOR_LIMIT",
737+
.helpText = "Max number of cursors for a given index that can be opened inside of a shard. Default is 128",
738+
.setValue = setIndexCursorLimit,
739+
.getValue = getIndexCursorLimit},
726740
{.name = "NO_MEM_POOLS",
727741
.helpText = "Set RediSearch to run without memory pools",
728742
.setValue = setNoMemPools,

src/config.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ typedef struct {
127127
// If set, we use an optimization that sorts the children of an intersection iterator in a way
128128
// where union iterators are being factorize by the number of their own children.
129129
int prioritizeIntersectUnionChildren;
130+
// Limit the number of cursors that can be created for a single index
131+
long long indexCursorLimit;
130132
} RSConfig;
131133

132134
typedef enum {
@@ -197,6 +199,7 @@ void DialectsGlobalStats_AddToInfo(RedisModuleInfoCtx *ctx);
197199
#define DEFAULT_MIN_PHONETIC_TERM_LEN 3
198200
#define DEFAULT_FORK_GC_RUN_INTERVAL 30
199201
#define DEFAULT_MAX_RESULTS_TO_UNSORTED_MODE 1000
202+
#define DEFAULT_INDEX_CURSOR_LIMIT 128
200203
#define SEARCH_REQUEST_RESULTS_MAX 1000000
201204
#define NR_MAX_DEPTH_BALANCE 2
202205
#define MIN_DIALECT_VERSION 1 // MIN_DIALECT_VERSION is expected to change over time as dialects become deprecated.
@@ -223,7 +226,8 @@ void DialectsGlobalStats_AddToInfo(RedisModuleInfoCtx *ctx);
223226
.forkGCCleanNumericEmptyNodes = true, .freeResourcesThread = true, .defaultDialectVersion = 1,\
224227
.vssMaxResize = 0, .multiTextOffsetDelta = 100, .used_dialects = 0, \
225228
.numBGIndexingIterationsBeforeSleep = 100, \
226-
.prioritizeIntersectUnionChildren = false \
229+
.prioritizeIntersectUnionChildren = false, \
230+
.indexCursorLimit = DEFAULT_INDEX_CURSOR_LIMIT \
227231
}
228232

229233
#define REDIS_ARRAY_LIMIT 7

src/cursor.c

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,15 +144,14 @@ int Cursors_CollectIdle(CursorList *cl) {
144144
return rc;
145145
}
146146

147-
void CursorList_AddSpec(CursorList *cl, const char *k, size_t capacity) {
147+
void CursorList_AddSpec(CursorList *cl, const char *k) {
148148
CursorSpecInfo *info = findInfo(cl, k);
149149
if (!info) {
150150
info = rm_malloc(sizeof(*info));
151151
info->keyName = rm_strdup(k);
152152
info->used = 0;
153153
dictAdd(cl->specsDict, (void *)k, info);
154154
}
155-
info->cap = capacity;
156155
}
157156

158157
void CursorList_RemoveSpec(CursorList *cl, const char *k) {
@@ -204,10 +203,10 @@ Cursor *Cursors_Reserve(CursorList *cl, const char *lookupName, unsigned interva
204203
goto done;
205204
}
206205

207-
if (spec->used >= spec->cap) {
206+
if (spec->used >= RSGlobalConfig.indexCursorLimit) {
208207
/** Collect idle cursors now */
209208
Cursors_GCInternal(cl, 0);
210-
if (spec->used >= spec->cap) {
209+
if (spec->used >= RSGlobalConfig.indexCursorLimit) {
211210
QueryError_SetError(status, QUERY_ELIMIT, "Too many cursors allocated for index");
212211
goto done;
213212
}
@@ -314,7 +313,7 @@ void Cursors_RenderStats(CursorList *cl, CursorList *cl_coord, const char *name,
314313
RedisModule_ReplyWithLongLong(ctx, kh_size(cl->lookup));
315314

316315
RedisModule_ReplyWithSimpleString(ctx, "index_capacity");
317-
RedisModule_ReplyWithLongLong(ctx, info->cap + (info_coord ? info_coord->cap : 0));
316+
RedisModule_ReplyWithLongLong(ctx, RSGlobalConfig.indexCursorLimit);
318317

319318
RedisModule_ReplyWithSimpleString(ctx, "index_total");
320319
RedisModule_ReplyWithLongLong(ctx, info->used + (info_coord ? info_coord->used : 0));
@@ -333,7 +332,7 @@ void Cursors_RenderStatsForInfo(CursorList *cl, CursorList *cl_coord, const char
333332
RedisModule_InfoBeginDictField(ctx, "cursor_stats");
334333
RedisModule_InfoAddFieldLongLong(ctx, "global_idle", ARRAY_GETSIZE_AS(&cl->idle, Cursor **) + ARRAY_GETSIZE_AS(&cl_coord->idle, Cursor **));
335334
RedisModule_InfoAddFieldLongLong(ctx, "global_total", kh_size(cl->lookup) + kh_size(cl_coord->lookup));
336-
RedisModule_InfoAddFieldLongLong(ctx, "index_capacity", info->cap + + (info_coord ? info_coord->cap : 0));
335+
RedisModule_InfoAddFieldLongLong(ctx, "index_capacity", RSGlobalConfig.indexCursorLimit);
337336
RedisModule_InfoAddFieldLongLong(ctx, "index_total", info->used + (info_coord ? info_coord->used : 0));
338337
RedisModule_InfoEndDictField(ctx);
339338

0 commit comments

Comments
 (0)