Skip to content

Commit a6fdeb1

Browse files
GuyAv46github-actions[bot]
authored andcommitted
Fix Fork GC potential double-free on error path - [MOD-12521] (#7423)
* make FGC_recvBuffer clean * add revents to timeout log * improve polling logs * Nullify tag field name * remove unused variable * add a test * add a stres unit-test * improve test * improve logging * add include (cherry picked from commit 442a75e)
1 parent a191794 commit a6fdeb1

2 files changed

Lines changed: 99 additions & 19 deletions

File tree

src/fork_gc.c

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -95,48 +95,53 @@ static void FGC_sendTerminator(ForkGC *fgc) {
9595

9696
static int __attribute__((warn_unused_result)) FGC_recvFixed(ForkGC *fgc, void *buf, size_t len) {
9797
// poll the pipe, so that we don't block while read, with timeout of 3 minutes
98-
while (poll(fgc->pollfd_read, 1, 180000) == 1) {
98+
int poll_rc;
99+
while ((poll_rc = poll(fgc->pollfd_read, 1, 180000)) == 1) {
99100
ssize_t nrecvd = read(fgc->pipe_read_fd, buf, len);
100101
if (nrecvd > 0) {
101102
buf += nrecvd;
102103
len -= nrecvd;
103104
} else if (nrecvd <= 0 && errno != EINTR) {
104-
RedisModule_Log(fgc->ctx, "warning", "ForkGC - got error while reading from pipe (%s)", strerror(errno));
105-
return REDISMODULE_ERR;
105+
break;
106106
}
107107
if (len == 0) {
108108
return REDISMODULE_OK;
109109
}
110110
}
111-
RedisModule_Log(fgc->ctx, "warning", "ForkGC - got timeout while reading from pipe (%s)", strerror(errno));
111+
short revents = fgc->pollfd_read[0].revents;
112+
const char *what = (poll_rc == 0) ? "timeout" : "error";
113+
RedisModule_Log(fgc->ctx, "warning", "ForkGC - got %s while reading from pipe. errno: %s, revents: 0x%x (POLLIN=%x POLLERR=%x POLLHUP=%x POLLNVAL=%x)",
114+
what, strerror(errno), revents, (revents & POLLIN), (revents & POLLERR), (revents & POLLHUP), (revents & POLLNVAL));
112115
return REDISMODULE_ERR;
113116
}
114117

115-
#define TRY_RECV_FIXED(gc, obj, len) \
116-
if (FGC_recvFixed(gc, obj, len) != REDISMODULE_OK) { \
117-
return REDISMODULE_ERR; \
118-
}
119-
120118
static void *RECV_BUFFER_EMPTY = (void *)0x0deadbeef;
121119

122120
static int __attribute__((warn_unused_result))
123121
FGC_recvBuffer(ForkGC *fgc, void **buf, size_t *len) {
124-
TRY_RECV_FIXED(fgc, len, sizeof *len);
125-
if (*len == SIZE_MAX) {
122+
size_t temp_len;
123+
if (FGC_recvFixed(fgc, &temp_len, sizeof temp_len) != REDISMODULE_OK) {
124+
return REDISMODULE_ERR;
125+
}
126+
if (temp_len == SIZE_MAX) {
127+
*len = temp_len;
126128
*buf = RECV_BUFFER_EMPTY;
127129
return REDISMODULE_OK;
128130
}
129-
if (*len == 0) {
131+
if (temp_len == 0) {
132+
*len = temp_len;
130133
*buf = NULL;
131134
return REDISMODULE_OK;
132135
}
133136

134-
*buf = rm_malloc(*len + 1);
135-
((char *)(*buf))[*len] = 0;
136-
if (FGC_recvFixed(fgc, *buf, *len) != REDISMODULE_OK) {
137-
rm_free(buf);
137+
char *buf_data = rm_malloc(temp_len + 1);
138+
buf_data[temp_len] = 0;
139+
if (FGC_recvFixed(fgc, buf_data, temp_len) != REDISMODULE_OK) {
140+
rm_free(buf_data);
138141
return REDISMODULE_ERR;
139142
}
143+
*len = temp_len;
144+
*buf = buf_data;
140145
return REDISMODULE_OK;
141146
}
142147

@@ -1015,7 +1020,7 @@ static FGCError FGC_parentHandleNumeric(ForkGC *gc) {
10151020

10161021
static FGCError FGC_parentHandleTags(ForkGC *gc) {
10171022
size_t fieldNameLen;
1018-
char *fieldName;
1023+
char *fieldName = NULL;
10191024
uint64_t tagUniqueId;
10201025
InvertedIndex *value = NULL;
10211026
FGCError status = recvNumericTagHeader(gc, &fieldName, &fieldNameLen, &tagUniqueId);
@@ -1304,8 +1309,6 @@ static int periodicCb(void *privdata) {
13041309
usleep(500);
13051310
}
13061311

1307-
pid_t ppid_before_fork = getpid();
1308-
13091312
TimeSampler_Start(&ts);
13101313
int pipefd[2];
13111314
int rc = pipe(pipefd); // create the pipe

tests/cpptests/test_cpp_forkgc.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ extern "C" {
2828
#include <set>
2929
#include <random>
3030
#include <unordered_set>
31+
#include <thread>
32+
3133
/**
3234
* The following tests purpose is to make sure the garbage collection is working properly,
3335
* without causing any data corruption or loss.
@@ -655,6 +657,81 @@ TEST_F(FGCTestTag, testDeleteDuringGCCleanup) {
655657
ASSERT_EQ(RSGlobalStats.totalStats.logically_deleted, 0);
656658
}
657659

660+
/**
661+
* Test that simulates a pipe error during GC to trigger the error path.
662+
* This test verifies that the error handling doesn't cause double-free or other issues.
663+
*/
664+
TEST_F(FGCTestTag, testPipeErrorDuringGC) {
665+
// Add some documents to create work for the GC
666+
ASSERT_TRUE(RS::addDocument(ctx, ism, "doc1", "f1", "hello"));
667+
ASSERT_TRUE(RS::addDocument(ctx, ism, "doc2", "f1", "hello"));
668+
ASSERT_TRUE(RS::addDocument(ctx, ism, "doc3", "f1", "hello"));
669+
670+
FGC_WaitBeforeFork(fgc);
671+
672+
// Delete documents to trigger GC work
673+
ASSERT_TRUE(RS::deleteDocument(ctx, ism, "doc1"));
674+
ASSERT_TRUE(RS::deleteDocument(ctx, ism, "doc2"));
675+
676+
FGC_ForkAndWaitBeforeApply(fgc);
677+
678+
// Close the read end of the pipe from the parent's perspective
679+
// This will cause poll() to immediately return an error (POLLNVAL),
680+
// simulating a pipe failure scenario without waiting 3 minutes
681+
close(fgc->pipe_read_fd);
682+
683+
// This should handle the error gracefully without crashes or double-frees
684+
FGC_Apply(fgc);
685+
686+
// The GC should have failed, so no bytes should be collected
687+
// (or at least the operation should complete without crashing)
688+
ASSERT_EQ(0, fgc->stats.totalCollected);
689+
}
690+
691+
/**
692+
* Test that closes the pipe while GC is actively applying changes.
693+
* This test runs multiple iterations to increase the chance of hitting different
694+
* code paths and timing windows during the apply phase.
695+
*/
696+
TEST_F(FGCTestTag, testPipeErrorDuringApply) {
697+
// Run multiple iterations to increase coverage of different timing scenarios
698+
for (int iteration = 0; iteration < 1000; iteration++) {
699+
// Add documents to create work for the GC
700+
std::string doc1 = "doc1_" + std::to_string(iteration);
701+
std::string doc2 = "doc2_" + std::to_string(iteration);
702+
std::string doc3 = "doc3_" + std::to_string(iteration);
703+
704+
ASSERT_TRUE(RS::addDocument(ctx, ism, doc1.c_str(), "f1", "hello"));
705+
ASSERT_TRUE(RS::addDocument(ctx, ism, doc2.c_str(), "f1", "hello"));
706+
ASSERT_TRUE(RS::addDocument(ctx, ism, doc3.c_str(), "f1", "hello"));
707+
708+
FGC_WaitBeforeFork(fgc);
709+
710+
// Delete documents to trigger GC work
711+
ASSERT_TRUE(RS::deleteDocument(ctx, ism, doc1.c_str()));
712+
ASSERT_TRUE(RS::deleteDocument(ctx, ism, doc2.c_str()));
713+
714+
FGC_ForkAndWaitBeforeApply(fgc);
715+
716+
// Start a thread to close the pipe after a brief delay
717+
// This creates a race condition where the pipe may be closed at various
718+
// points during the apply process
719+
std::thread closer([this, iteration]() {
720+
// Variable delay to hit different code paths
721+
usleep(iteration);
722+
close(fgc->pipe_read_fd);
723+
});
724+
725+
// Apply should handle the pipe closure gracefully without crashing
726+
FGC_Apply(fgc);
727+
728+
closer.join();
729+
730+
// Don't make any assertions about the state - it's timing dependent
731+
// The important thing is that we don't crash or have memory corruption
732+
}
733+
}
734+
658735
TEST_F(FGCTestNumeric, testNumericBlocksSinceFork) {
659736
const auto startValue = TotalIIBlocks;
660737
constexpr size_t docs_per_block = INDEX_BLOCK_SIZE;

0 commit comments

Comments
 (0)