Skip to content

Commit 78329f6

Browse files
authored
Improve timeout check for FT.AGGREGATE in cluster mode - [MOD-12434] (#7359)
* allow empty replies to be pushed to the query channel * fix empty reply check * actually fix empty reply check * address review comment
1 parent 7451330 commit 78329f6

1 file changed

Lines changed: 13 additions & 6 deletions

File tree

coord/src/dist_aggregate.c

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ static void netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
130130
MRReply *results = NULL;
131131
if (map && MRReply_Type(map) == MR_REPLY_MAP) {
132132
results = MRReply_MapElement(map, "results");
133-
if (results && MRReply_Type(results) == MR_REPLY_ARRAY && MRReply_Length(results) > 0) {
133+
if (results && MRReply_Type(results) == MR_REPLY_ARRAY) {
134134
MRIteratorCallback_AddReply(ctx, rep); // to be picked up by getNextReply
135135
// User code now owns the reply, so we can't free it here ourselves!
136136
rep = NULL;
@@ -140,7 +140,7 @@ static void netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
140140
else // RESP2
141141
{
142142
MRReply *results = MRReply_ArrayElement(rep, 0);
143-
if (results && MRReply_Type(results) == MR_REPLY_ARRAY && MRReply_Length(results) > 1) {
143+
if (results && MRReply_Type(results) == MR_REPLY_ARRAY && MRReply_Length(results) >= 1) {
144144
MRIteratorCallback_AddReply(ctx, rep); // to be picked up by getNextReply
145145
// User code now owns the reply, so we can't free it here ourselves!
146146
rep = NULL;
@@ -265,13 +265,20 @@ static int getNextReply(RPNet *nc) {
265265
}
266266

267267
MRReply *rows = MRReply_ArrayElement(root, 0);
268-
if ( rows == NULL
269-
|| (MRReply_Type(rows) != MR_REPLY_ARRAY && MRReply_Type(rows) != MR_REPLY_MAP)
270-
|| MRReply_Length(rows) == 0) {
268+
// Perform sanity check to avoid processing empty replies
269+
bool is_empty;
270+
if (nc->cmd.protocol == 3) { // RESP3
271+
MRReply *results = MRReply_MapElement(rows, "results");
272+
is_empty = MRReply_Length(results) == 0;
273+
} else { // RESP2
274+
is_empty = MRReply_Length(rows) == 1;
275+
}
276+
277+
if (is_empty) {
271278
MRReply_Free(root);
272279
root = NULL;
273280
rows = NULL;
274-
RedisModule_Log(RSDummyContext, "warning", "An empty reply was received from a shard");
281+
RedisModule_Log(RSDummyContext, "verbose", "An empty reply was received from a shard");
275282
}
276283

277284
// invariant: either rows == NULL or least one row exists

0 commit comments

Comments
 (0)