Skip to content

Commit 071f3f1

Browse files
GuyAv46github-actions[bot]
authored andcommitted
Improve timeout check for FT.AGGREGATE in cluster mode - [MOD-12434] (#7359)
* allow empty replies to be pushed to the query channel * fix empty reply check * actually fix empty reply check * address review comment (cherry picked from commit 78329f6)
1 parent b503484 commit 071f3f1

1 file changed

Lines changed: 13 additions & 6 deletions

File tree

coord/src/dist_aggregate.c

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ static void netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
127127
MRReply *results = NULL;
128128
if (map && MRReply_Type(map) == MR_REPLY_MAP) {
129129
results = MRReply_MapElement(map, "results");
130-
if (results && MRReply_Type(results) == MR_REPLY_ARRAY && MRReply_Length(results) > 0) {
130+
if (results && MRReply_Type(results) == MR_REPLY_ARRAY) {
131131
MRIteratorCallback_AddReply(ctx, rep); // to be picked up by getNextReply
132132
// User code now owns the reply, so we can't free it here ourselves!
133133
rep = NULL;
@@ -137,7 +137,7 @@ static void netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
137137
else // RESP2
138138
{
139139
MRReply *results = MRReply_ArrayElement(rep, 0);
140-
if (results && MRReply_Type(results) == MR_REPLY_ARRAY && MRReply_Length(results) > 1) {
140+
if (results && MRReply_Type(results) == MR_REPLY_ARRAY && MRReply_Length(results) >= 1) {
141141
MRIteratorCallback_AddReply(ctx, rep); // to be picked up by getNextReply
142142
// User code now owns the reply, so we can't free it here ourselves!
143143
rep = NULL;
@@ -264,13 +264,20 @@ static int getNextReply(RPNet *nc) {
264264
}
265265

266266
MRReply *rows = MRReply_ArrayElement(root, 0);
267-
if ( rows == NULL
268-
|| (MRReply_Type(rows) != MR_REPLY_ARRAY && MRReply_Type(rows) != MR_REPLY_MAP)
269-
|| MRReply_Length(rows) == 0) {
267+
// Perform sanity check to avoid processing empty replies
268+
bool is_empty;
269+
if (nc->cmd.protocol == 3) { // RESP3
270+
MRReply *results = MRReply_MapElement(rows, "results");
271+
is_empty = MRReply_Length(results) == 0;
272+
} else { // RESP2
273+
is_empty = MRReply_Length(rows) == 1;
274+
}
275+
276+
if (is_empty) {
270277
MRReply_Free(root);
271278
root = NULL;
272279
rows = NULL;
273-
RedisModule_Log(RSDummyContext, "warning", "An empty reply was received from a shard");
280+
RedisModule_Log(RSDummyContext, "verbose", "An empty reply was received from a shard");
274281
}
275282

276283
// invariant: either rows == NULL or least one row exists

0 commit comments

Comments
 (0)