Skip to content

Commit 457536e

Browse files
[2.10] Fix MRIterator ownership mechanism - [MOD-8108] (#5266)
* Fix MRIterator ownership mechanism - [MOD-8108] (#5244) * implement release mechanism for MRIterator * refactor MRChannel * delete CursorList_Expire * fix test for async cursor deletion * minor cleanup * more cleanup * remove a log message that is now more likely to be logged * log about stuck RQ less frequently * even less noisy * rename MRChannel_Close (cherry picked from commit 6f68d69) * Fix a potential use after free - [MOD-8108] (#5268) fix a potential use after free --------- Co-authored-by: GuyAv46 <47632673+GuyAv46@users.noreply.github.com>
1 parent 496a159 commit 457536e

11 files changed

Lines changed: 109 additions & 213 deletions

File tree

coord/src/dist_aggregate.c

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ static void netCursorCallback(MRIteratorCallbackCtx *ctx, MRReply *rep) {
7272
// If the root command of this reply is a DEL command, we don't want to
7373
// propagate it up the chain to the client
7474
if (cmd->rootCommand == C_DEL) {
75-
if (MRReply_Type(rep) == MR_REPLY_ERROR) {
76-
RedisModule_Log(RSDummyContext, "warning", "Error returned for CURSOR.DEL command from shard");
77-
}
7875
// Discard the response, and return REDIS_OK
7976
MRIteratorCallback_Done(ctx, MRReply_Type(rep) == MR_REPLY_ERROR);
8077
MRReply_Free(rep);
@@ -247,7 +244,7 @@ static int getNextReply(RPNet *nc) {
247244
}
248245
}
249246
MRReply *root = MRIterator_Next(nc->it);
250-
if (root == MRITERATOR_DONE) {
247+
if (root == NULL) {
251248
// No more replies
252249
nc->current.root = NULL;
253250
nc->current.rows = NULL;
@@ -465,11 +462,8 @@ static int rpnetNext_Start(ResultProcessor *rp, SearchResult *r) {
465462
static void rpnetFree(ResultProcessor *rp) {
466463
RPNet *nc = (RPNet *)rp;
467464

468-
// the iterator might not be done - some producers might still be sending data, let's wait for
469-
// them...
470465
if (nc->it) {
471-
MRIterator_WaitDone(nc->it, nc->cmd.forCursor);
472-
MRIterator_Free(nc->it);
466+
MRIterator_Release(nc->it);
473467
}
474468

475469
if (nc->shardsProfile) {

coord/src/rmr/chan.c

Lines changed: 15 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,23 @@
44
* the Server Side Public License v1 (SSPLv1).
55
*/
66

7-
#define MR_CHAN_C_
87
#include <pthread.h>
9-
#include <sys/time.h>
108
#include <stdlib.h>
11-
#include <errno.h>
12-
#include <stdio.h>
13-
#include <assert.h>
14-
15-
void *MRCHANNEL_CLOSED = (void *)"MRCHANNEL_CLOSED";
9+
#include <stdbool.h>
1610

1711
typedef struct chanItem {
1812
void *ptr;
1913
struct chanItem *next;
2014
} chanItem;
2115

22-
typedef struct MRChannel {
16+
struct MRChannel {
2317
chanItem *head;
2418
chanItem *tail;
2519
size_t size;
26-
volatile int open;
20+
volatile bool wait;
2721
pthread_mutex_t lock;
2822
pthread_cond_t cond;
29-
// condition used to wait for closing
30-
pthread_cond_t closeCond;
31-
} MRChannel;
23+
};
3224

3325
#include "chan.h"
3426
#include "rmalloc.h"
@@ -39,28 +31,14 @@ MRChannel *MR_NewChannel() {
3931
.head = NULL,
4032
.tail = NULL,
4133
.size = 0,
42-
.open = 1,
34+
.wait = true,
4335
};
4436
pthread_cond_init(&chan->cond, NULL);
45-
pthread_cond_init(&chan->closeCond, NULL);
46-
4737
pthread_mutex_init(&chan->lock, NULL);
4838
return chan;
4939
}
5040

51-
/* Safely wait until the channel is closed */
52-
void MRChannel_WaitClose(MRChannel *chan) {
53-
pthread_mutex_lock(&chan->lock);
54-
while (chan->open) {
55-
pthread_cond_wait(&chan->closeCond, &chan->lock);
56-
}
57-
pthread_mutex_unlock(&chan->lock);
58-
}
59-
6041
void MRChannel_Free(MRChannel *chan) {
61-
62-
// TODO: proper drain and stop routine
63-
6442
pthread_mutex_destroy(&chan->lock);
6543
pthread_cond_destroy(&chan->cond);
6644
rm_free(chan);
@@ -73,18 +51,11 @@ size_t MRChannel_Size(MRChannel *chan) {
7351
return ret;
7452
}
7553

76-
PushErrorMask MRChannel_Push(MRChannel *chan, void *ptr) {
77-
78-
pthread_mutex_lock(&chan->lock);
79-
int rc = 0;
80-
if (!chan->open) {
81-
rc = CHANNEL_CLOSED;
82-
goto end;
83-
}
84-
54+
void MRChannel_Push(MRChannel *chan, void *ptr) {
8555
chanItem *item = rm_malloc(sizeof(*item));
8656
item->next = NULL;
8757
item->ptr = ptr;
58+
pthread_mutex_lock(&chan->lock);
8859
if (chan->tail) {
8960
// make it the next of the current tail
9061
chan->tail->next = item;
@@ -94,10 +65,8 @@ PushErrorMask MRChannel_Push(MRChannel *chan, void *ptr) {
9465
chan->head = chan->tail = item;
9566
}
9667
chan->size++;
97-
end:
98-
if (pthread_cond_broadcast(&chan->cond)) rc |= BROADCAST_FAILURE;
68+
pthread_cond_broadcast(&chan->cond);
9969
pthread_mutex_unlock(&chan->lock);
100-
return rc;
10170
}
10271

10372
void *MRChannel_UnsafeForcePop(MRChannel *chan) {
@@ -115,40 +84,32 @@ void *MRChannel_UnsafeForcePop(MRChannel *chan) {
11584
return ret;
11685
}
11786

118-
// todo wait is not actually used anywhere...
11987
void *MRChannel_Pop(MRChannel *chan) {
120-
void *ret = NULL;
121-
12288
pthread_mutex_lock(&chan->lock);
12389
while (!chan->size) {
124-
if (!chan->open) {
90+
if (!chan->wait) {
12591
pthread_mutex_unlock(&chan->lock);
126-
return MRCHANNEL_CLOSED;
92+
return NULL;
12793
}
128-
129-
int rc = pthread_cond_wait(&chan->cond, &chan->lock);
130-
assert(rc == 0 && "cond_wait failed");
94+
pthread_cond_wait(&chan->cond, &chan->lock);
13195
}
13296

13397
chanItem *item = chan->head;
134-
assert(item);
13598
chan->head = item->next;
13699
// empty queue...
137100
if (!chan->head) chan->tail = NULL;
138101
chan->size--;
139102
pthread_mutex_unlock(&chan->lock);
140103
// discard the item (TODO: recycle items)
141-
ret = item->ptr;
104+
void *ret = item->ptr;
142105
rm_free(item);
143106
return ret;
144107
}
145108

146-
void MRChannel_Close(MRChannel *chan) {
109+
void MRChannel_Unblock(MRChannel *chan) {
147110
pthread_mutex_lock(&chan->lock);
148-
chan->open = 0;
149-
// notify any waiting readers
111+
chan->wait = false;
112+
// unblock any waiting readers
150113
pthread_cond_broadcast(&chan->cond);
151-
pthread_cond_broadcast(&chan->closeCond);
152-
153114
pthread_mutex_unlock(&chan->lock);
154115
}

coord/src/rmr/chan.h

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,31 +9,24 @@
99

1010
#include <stdlib.h>
1111

12-
#ifndef MR_CHAN_C_
1312
typedef struct MRChannel MRChannel;
14-
#endif
15-
16-
extern void *MRCHANNEL_CLOSED;
17-
18-
typedef enum {
19-
CHANNEL_CLOSED = 0x1,
20-
BROADCAST_FAILURE = 0x2
21-
} PushErrorMask;
13+
MRChannel *MR_NewChannel();
2214

15+
// Push an item to the channel. Succeeds even if the channel is closed.
16+
void MRChannel_Push(MRChannel *chan, void *ptr);
2317

24-
MRChannel *MR_NewChannel();
25-
PushErrorMask MRChannel_Push(MRChannel *chan, void *ptr);
26-
/* Pop an item, wait indefinitely or until the channel is closed for an item.
27-
* Return MRCHANNEL_CLOSED if the channel is closed*/
18+
/* Pop an item, or wait until there is an item to pop or until the channel is closed.
19+
* Return NULL if the channel is closed*/
2820
void *MRChannel_Pop(MRChannel *chan);
2921

3022
// Same as MRChannel_Pop, but does not lock the channel nor wait for results if it's empty.
3123
// This is unsafe, and should only be used when the caller is sure that the channel is not being used by other threads.
3224
void *MRChannel_UnsafeForcePop(MRChannel *chan);
3325

34-
/* Safely wait until the channel is closed */
35-
void MRChannel_WaitClose(MRChannel *chan);
26+
// Make channel unblocking. All subsequent calls to MRChannel_Pop will return NULL if the channel is empty.
27+
void MRChannel_Unblock(MRChannel *chan);
3628

37-
void MRChannel_Close(MRChannel *chan);
3829
size_t MRChannel_Size(MRChannel *chan);
30+
31+
// Free the channel. Assumes the caller has already emptied the channel.
3932
void MRChannel_Free(MRChannel *chan);

0 commit comments

Comments
 (0)