Skip to content

Commit 4a67a87

Browse files
authored
MOD-7329: Determine if fields and documents are expired before returning them as valid results (#4858)
* * initial commit * * fix missing case in test_expire.py * * fix coord compilation * * fix doc info test * * try and fix some of the tests * * release iterator * * do not set expiration time in the context of the coordinator since it doesn't have an index spec * * support c++ tests that do not provide spec for iterators * * no point in passing timestamp in coordinator flow * * redis 7.4 version in ci is actually 7.3, for codecov we need our tests to run * * reduce max memory from + 200000 to 150000 * * fix test * * cleanup code * * Code Review: Round #1 * * Add geoshape tests, only pass index instead of pointer to filter ctx * * Add vector test * * Code Review - Round #2 * * try and fix tests * * Code Review - Round #3 * * Fix Sanitizer Build * * Code Review - Round #4 * * Minor change in debug command SET_MONITOR_EXPIRATION * * Code Review - Round #5 * * fix tests * * Code Review - Round #6 * * fix crash * * fix test * * improve field mask translation time * fix compilation * fix compilation error * handle field mask of 64 bit * * Code Review - Round #7 * * fix opening of key edge cases * * free fieldIdToIndex array * Code Review - Round #9 * * fix rebase conflicts * support wild card reader filtering * * mac os: fallback to CLOCK_REALTIME if CLOCK_REALTIME_COARSE is not defined
1 parent 6d3b4e3 commit 4a67a87

65 files changed

Lines changed: 1054 additions & 325 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

src/aggregate/aggregate.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,6 @@ typedef struct AREQ {
151151
/** Flags indicating current execution state */
152152
uint32_t stateflags;
153153

154-
struct timespec timeoutTime;
155-
156154
int protocol; // RESP2/3
157155

158156
/*

src/aggregate/aggregate_exec.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ void startPipeline(AREQ *req, ResultProcessor *rp, SearchResult ***results, Sear
352352
// Aggregate all results before populating the response
353353
*results = AggregateResults(rp, rc);
354354
// Check timeout after aggregation
355-
if (TimedOut(&RP_SCTX(rp)->timeout) == TIMED_OUT) {
355+
if (TimedOut(&RP_SCTX(rp)->time.timeout) == TIMED_OUT) {
356356
*rc = RS_RESULT_TIMEDOUT;
357357
}
358358
} else {
@@ -773,8 +773,7 @@ int prepareExecutionPlan(AREQ *req, QueryError *status) {
773773
// TODO: this should be done in `AREQ_execute`, but some of the iterators needs the timeout's
774774
// value and some of the execution begins in `QAST_Iterate`.
775775
// Setting the timeout context should be done in the same thread that executes the query.
776-
updateTimeout(&req->timeoutTime, req->reqConfig.queryTimeoutMS);
777-
sctx->timeout = req->timeoutTime;
776+
SearchCtx_UpdateTime(sctx, req->reqConfig.queryTimeoutMS);
778777

779778
ConcurrentSearchCtx_Init(sctx->redisCtx, &req->conc);
780779
req->rootiter = QAST_Iterate(ast, opts, sctx, &req->conc, req->reqflags, status);
@@ -784,7 +783,7 @@ int prepareExecutionPlan(AREQ *req, QueryError *status) {
784783
QOptimizer_Iterators(req, req->optimizer);
785784
}
786785

787-
TimedOut_WithStatus(&sctx->timeout, status);
786+
TimedOut_WithStatus(&sctx->time.timeout, status);
788787

789788
if (QueryError_HasError(status)) {
790789
return REDISMODULE_ERR;
@@ -1054,8 +1053,7 @@ static void runCursor(RedisModule_Reply *reply, Cursor *cursor, size_t num) {
10541053
}
10551054

10561055
// update timeout for current cursor read
1057-
updateTimeout(&req->timeoutTime, req->reqConfig.queryTimeoutMS);
1058-
SearchCtx_UpdateTimeout(req->sctx, req->timeoutTime);
1056+
SearchCtx_UpdateTime(req->sctx, req->reqConfig.queryTimeoutMS);
10591057

10601058
if (!num) {
10611059
num = req->cursorChunkSize;

src/coord/dist_aggregate.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ static int rpnetNext(ResultProcessor *self, SearchResult *r) {
393393

394394
// get the next reply from the channel
395395
while (!root || !rows || MRReply_Length(rows) == 0) {
396-
if(TimedOut(&self->parent->sctx->timeout)) {
396+
if (TimedOut(&self->parent->sctx->time.timeout)) {
397397
// Set the `timedOut` flag in the MRIteratorCtx, later to be read by the
398398
// callback so that a `CURSOR DEL` command will be dispatched instead of
399399
// a `CURSOR READ` command.
@@ -707,9 +707,6 @@ void RSExecDistAggregate(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
707707
}
708708
}
709709

710-
// Set the timeout
711-
updateTimeout(&r->timeoutTime, r->reqConfig.queryTimeoutMS);
712-
713710
rc = AGGPLN_Distribute(&r->ap, &status);
714711
if (rc != REDISMODULE_OK) goto err;
715712

@@ -735,7 +732,7 @@ void RSExecDistAggregate(RedisModuleCtx *ctx, RedisModuleString **argv, int argc
735732
r->sctx = rm_new(RedisSearchCtx);
736733
*r->sctx = SEARCH_CTX_STATIC(ctx, NULL);
737734
r->sctx->apiVersion = dialect;
738-
r->sctx->timeout = r->timeoutTime;
735+
SearchCtx_UpdateTime(r->sctx, r->reqConfig.queryTimeoutMS);
739736
r->qiter.sctx = r->sctx;
740737
// r->sctx->expanded should be received from shards
741738

src/debug_commands.c

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ DEBUG_COMMAND(DumpInvertedIndex) {
178178
RedisModule_ReplyWithError(sctx->redisCtx, "Can not find the inverted index");
179179
goto end;
180180
}
181-
IndexReader *reader = NewTermIndexReader(invidx, NULL, RS_FIELDMASK_ALL, NULL, 1);
181+
IndexReader *reader = NewTermIndexReader(invidx);
182182
ReplyReaderResults(reader, sctx->redisCtx);
183183

184184
end:
@@ -260,7 +260,8 @@ DEBUG_COMMAND(DumpNumericIndex) {
260260
ARRAY_LEN_VAR(numericHeader) += InvertedIndexSummaryHeader(sctx->redisCtx, invidx);
261261
END_POSTPONED_LEN_ARRAY(numericHeader);
262262
}
263-
IndexReader *reader = NewNumericReader(NULL, range->entries, NULL, range->minVal, range->maxVal, true);
263+
FieldFilterContext fieldCtx = {.field.isFieldMask = false, .field.value.index = RS_INVALID_FIELD_INDEX, .predicate = FIELD_EXPIRATION_DEFAULT};
264+
IndexReader *reader = NewNumericReader(NULL, range->entries, NULL, range->minVal, range->maxVal, true, &fieldCtx);
264265
ReplyReaderResults(reader, sctx->redisCtx);
265266
++ARRAY_LEN_VAR(numericInvertedIndex); // end (1)Header 2)entries (header is optional)
266267
}
@@ -330,7 +331,7 @@ InvertedIndexStats InvertedIndex_DebugReply(RedisModuleCtx *ctx, InvertedIndex *
330331
REPLY_WITH_STR("values", ARRAY_LEN_VAR(invertedIndexDump));
331332
START_POSTPONED_LEN_ARRAY(invertedIndexValues);
332333
RSIndexResult *res = NULL;
333-
IndexReader *ir = NewNumericReader(NULL, idx, NULL ,0, 0, false);
334+
IndexReader *ir = NewMinimalNumericReader(idx, false);
334335
while (INDEXREAD_OK == IR_Read(ir, &res)) {
335336
REPLY_WITH_DOUBLE("value", res->num.value, ARRAY_LEN_VAR(invertedIndexValues));
336337
REPLY_WITH_LONG_LONG("docId", res->docId, ARRAY_LEN_VAR(invertedIndexValues));
@@ -475,7 +476,7 @@ DEBUG_COMMAND(DumpTagIndex) {
475476
while (TrieMapIterator_Next(iter, &tag, &len, (void **)&iv)) {
476477
RedisModule_ReplyWithArray(sctx->redisCtx, 2);
477478
RedisModule_ReplyWithStringBuffer(sctx->redisCtx, tag, len);
478-
IndexReader *reader = NewTermIndexReader(iv, NULL, RS_FIELDMASK_ALL, NULL, 1);
479+
IndexReader *reader = NewTermIndexReader(iv);
479480
ReplyReaderResults(reader, sctx->redisCtx);
480481
++resultSize;
481482
}
@@ -819,6 +820,59 @@ DEBUG_COMMAND(ttlExpire) {
819820
return RedisModule_ReplyWithSimpleString(ctx, "OK");
820821
}
821822

823+
typedef struct {
824+
int docs;
825+
int notDocs;
826+
int fields;
827+
int notFields;
828+
} MonitorExpirationOptions;
829+
830+
DEBUG_COMMAND(setMonitorExpiration) {
831+
if (argc < 3) {
832+
return RedisModule_WrongArity(ctx);
833+
}
834+
835+
IndexLoadOptions lopts = {.nameC = RedisModule_StringPtrLen(argv[2], NULL),
836+
.flags = INDEXSPEC_LOAD_NOTIMERUPDATE};
837+
838+
StrongRef ref = IndexSpec_LoadUnsafeEx(ctx, &lopts);
839+
IndexSpec *sp = StrongRef_Get(ref);
840+
if (!sp) {
841+
return RedisModule_ReplyWithError(ctx, "Unknown index name");
842+
}
843+
844+
MonitorExpirationOptions options = {0};
845+
ACArgSpec argspecs[] = {
846+
{.name = "not-documents", .type = AC_ARGTYPE_BOOLFLAG, .target = &options.notDocs},
847+
{.name = "documents", .type = AC_ARGTYPE_BOOLFLAG, .target = &options.docs},
848+
{.name = "fields", .type = AC_ARGTYPE_BOOLFLAG, .target = &options.fields},
849+
{.name = "not-fields", .type = AC_ARGTYPE_BOOLFLAG, .target = &options.notFields},
850+
{NULL}};
851+
RedisModuleKey *keyp = NULL;
852+
ArgsCursor ac = {0};
853+
ACArgSpec *errSpec = NULL;
854+
ArgsCursor_InitRString(&ac, argv + 3, argc - 3);
855+
int rv = AC_ParseArgSpec(&ac, argspecs, &errSpec);
856+
if (rv != AC_OK) {
857+
return RedisModule_ReplyWithError(ctx, "Could not parse argument (argspec fixme)");
858+
}
859+
if (options.docs && options.notDocs) {
860+
return RedisModule_ReplyWithError(ctx, "Can't set both documents and not-documents");
861+
}
862+
if (options.fields && options.notFields) {
863+
return RedisModule_ReplyWithError(ctx, "Can't set both fields and not-fields");
864+
}
865+
866+
if (options.docs || options.notDocs) {
867+
sp->monitorDocumentExpiration = options.docs && !options.notDocs;
868+
}
869+
if (options.fields || options.notFields) {
870+
sp->monitorFieldExpiration = options.fields && !options.notFields;
871+
}
872+
RedisModule_ReplyWithSimpleString(ctx, "OK");
873+
return REDISMODULE_OK;
874+
}
875+
822876
DEBUG_COMMAND(GitSha) {
823877
#ifdef GIT_SHA
824878
RedisModule_ReplyWithStringBuffer(ctx, GIT_SHA, strlen(GIT_SHA));
@@ -938,7 +992,7 @@ DEBUG_COMMAND(InfoTagIndex) {
938992

939993
if (options.dumpIdEntries) {
940994
RedisModule_ReplyWithLiteral(ctx, "entries");
941-
IndexReader *reader = NewTermIndexReader(iv, NULL, RS_FIELDMASK_ALL, NULL, 1);
995+
IndexReader *reader = NewTermIndexReader(iv);
942996
ReplyReaderResults(reader, sctx->redisCtx);
943997
}
944998

@@ -1246,6 +1300,7 @@ DebugCommandType commands[] = {{"DUMP_INVIDX", DumpInvertedIndex}, // Print all
12461300
{"VECSIM_INFO", VecsimInfo},
12471301
{"DELETE_LOCAL_CURSORS", DeleteCursors},
12481302
{"DUMP_HNSW", dumpHNSWData},
1303+
{"SET_MONITOR_EXPIRATION", setMonitorExpiration},
12491304
{"WORKERS", WorkerThreadsSwitch},
12501305
{NULL, NULL}};
12511306

src/doc_table.c

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,33 @@ void DocTable_SetByteOffsets(RSDocumentMetadata *dmd, RSByteOffsets *v) {
209209
dmd->flags |= Document_HasOffsetVector;
210210
}
211211

212+
void DocTable_UpdateExpiration(DocTable *t, RSDocumentMetadata* dmd, t_expirationTimePoint ttl, arrayof(FieldExpiration) sortedFieldWithExpiration) {
213+
if (hasExpirationTimeInformation(dmd->flags)) {
214+
TimeToLiveTable_VerifyInit(&t->ttl);
215+
TimeToLiveTable_Add(t->ttl, dmd->id, ttl, sortedFieldWithExpiration);
216+
}
217+
}
218+
219+
bool DocTable_HasExpiration(DocTable *t, t_docId docId)
220+
{
221+
return t->ttl && TimeToLiveTable_HasExpiration(t->ttl, docId);
222+
}
223+
224+
bool DocTable_IsDocExpired(DocTable* t, const RSDocumentMetadata* dmd, struct timespec* expirationPoint) {
225+
if (!hasExpirationTimeInformation(dmd->flags)) {
226+
return false;
227+
}
228+
RS_LOG_ASSERT(t->ttl, "Document has expiration time information but no TTL table");
229+
return TimeToLiveTable_HasDocExpired(t->ttl, dmd->id, expirationPoint);
230+
}
231+
232+
bool DocTable_VerifyFieldExpirationPredicate(const DocTable *t, t_docId docId, const t_fieldIndex* fieldIndices, size_t fieldCount, enum FieldExpirationPredicate predicate, const struct timespec* expirationPoint) {
233+
if (!t->ttl || !fieldIndices || fieldCount == 0) {
234+
return true;
235+
}
236+
return TimeToLiveTable_VerifyDocAndFields(t->ttl, docId, fieldIndices, fieldCount, predicate, expirationPoint);
237+
}
238+
212239
/* Put a new document into the table, assign it an incremental id and store the metadata in the
213240
* table.
214241
*
@@ -319,6 +346,7 @@ void DocTable_Free(DocTable *t) {
319346
}
320347
}
321348
rm_free(t->buckets);
349+
TimeToLiveTable_Destroy(&t->ttl);
322350
DocIdMap_Free(&t->dim);
323351
}
324352

@@ -346,6 +374,14 @@ RSDocumentMetadata *DocTable_Pop(DocTable *t, const char *s, size_t n) {
346374
if (!md) {
347375
return NULL;
348376
}
377+
378+
if (t->ttl && hasExpirationTimeInformation(md->flags)) {
379+
TimeToLiveTable_Remove(t->ttl, md->id);
380+
if (TimeToLiveTable_IsEmpty(t->ttl)) {
381+
TimeToLiveTable_Destroy(&t->ttl);
382+
}
383+
}
384+
349385
// Assuming we already locked the spec for write, and we don't have multiple writers,
350386
// all the next operations don't need to be atomic
351387
md->flags |= Document_Deleted;

src/doc_table.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
#include "sortable.h"
1515
#include "byte_offsets.h"
1616
#include "rmutil/sds.h"
17-
#include "util/dict.h"
1817
#include "rmutil/rm_assert.h"
18+
#include "ttl_table.h"
1919

2020
#ifdef __cplusplus
2121
extern "C" {
@@ -73,6 +73,7 @@ typedef struct {
7373

7474
DMDChain *buckets;
7575
DocIdMap dim; // Mapping between document name to internal id
76+
TimeToLiveTable* ttl;
7677
} DocTable;
7778

7879
#define DOCTABLE_FOREACH(dt, code) \
@@ -128,6 +129,22 @@ int DocTable_SetSortingVector(DocTable *t, RSDocumentMetadata *dmd, RSSortingVec
128129
*/
129130
void DocTable_SetByteOffsets(RSDocumentMetadata *dmd, RSByteOffsets *offsets);
130131

132+
void DocTable_UpdateExpiration(DocTable *t, RSDocumentMetadata* dmd, t_expirationTimePoint ttl, arrayof(FieldExpiration) allFieldSorted);
133+
134+
typedef struct {
135+
FieldMaskOrIndex field;
136+
// our field expiration predicate
137+
enum FieldExpirationPredicate predicate;
138+
} FieldFilterContext;
139+
140+
bool DocTable_HasExpiration(DocTable *t, t_docId docId);
141+
bool DocTable_IsDocExpired(DocTable* t, const RSDocumentMetadata* dmd, struct timespec* expirationPoint);
142+
143+
// Will return true if the document passed the predicate
144+
// default predicate - one of the fields did not yet expire -> entry is still valid
145+
// missing predicate - one of the fields did expire -> entry is valid in the context of missing
146+
bool DocTable_VerifyFieldExpirationPredicate(const DocTable *t, t_docId docId, const t_fieldIndex* fieldIndices, size_t fieldCount, enum FieldExpirationPredicate predicate, const struct timespec* expirationPoint);
147+
131148
/** Get the docId of a key if it exists in the table, or 0 if it doesnt */
132149
t_docId DocTable_GetId(const DocTable *dt, const char *s, size_t n);
133150

src/document.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ typedef struct Document {
9191
RSLanguage language;
9292
float score;
9393
t_docId docId;
94+
t_expirationTimePoint docExpirationTime;
95+
FieldExpiration* fieldExpirations;
9496
const char *payload;
9597
size_t payloadSize;
9698
uint32_t flags;

0 commit comments

Comments
 (0)