Skip to content

Commit 43d83c0

Browse files
committed
Merge remote-tracking branch 'origin/master' into feature-coordinator-hybrid-jk_update
2 parents 9c9f637 + 40044ce commit 43d83c0

7 files changed

Lines changed: 72 additions & 32 deletions

File tree

src/hybrid/hybrid_request.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ int HybridRequest_BuildMergePipeline(HybridRequest *req, HybridPipelineParams *p
107107
HybridLookupContext *lookupCtx = InitializeHybridLookupContext(req->requests, lookup);
108108

109109
const char *scoreAlias = params->aggregationParams.common.scoreAlias;
110+
const RLookupKey *docKey = RLookup_GetKey_Read(lookup, UNDERSCORE_KEY, RLOOKUP_F_HIDDEN);
110111
const RLookupKey *scoreKey = NULL;
111112
if (scoreAlias) {
112113
scoreKey = RLookup_GetKey_Write(lookup, scoreAlias, RLOOKUP_F_NOFLAGS);
@@ -118,7 +119,7 @@ int HybridRequest_BuildMergePipeline(HybridRequest *req, HybridPipelineParams *p
118119
} else {
119120
scoreKey = RLookup_GetKey_Read(lookup, UNDERSCORE_SCORE, RLOOKUP_F_NOFLAGS);
120121
}
121-
ResultProcessor *merger = RPHybridMerger_New(params->scoringCtx, depleters, req->nrequests, scoreKey, req->subqueriesReturnCodes, lookupCtx);
122+
ResultProcessor *merger = RPHybridMerger_New(params->scoringCtx, depleters, req->nrequests, docKey, scoreKey, req->subqueriesReturnCodes, lookupCtx);
122123
params->scoringCtx = NULL; // ownership transferred to merger
123124
QITR_PushRP(&req->tailPipeline->qctx, merger);
124125
// Build the aggregation part of the tail pipeline for final result processing

src/hybrid/parse_hybrid.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,10 @@ int parseHybridCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
655655
// We need a load step, implicit or an explicit one
656656
PLN_LoadStep *loadStep = (PLN_LoadStep *)AGPLN_FindStep(parsedCmdCtx->tailPlan, NULL, NULL, PLN_T_LOAD);
657657
if (!loadStep) {
658+
// TBH don't think we need this implicit step, added to somehow affect the resulting response format
659+
// We wanted that by default the key and score would be returned to the user
660+
// This should probably be done in the hybrid send chunk where we decide on the response format.
661+
// For now keeping it as is - due to time constraints
658662
loadStep = createImplicitLoadStep();
659663
} else {
660664
AGPLN_PopStep(&loadStep->base);

src/result_processor.c

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,13 +1676,15 @@ dictType dictTypeHybridSearchResult = {
16761676
typedef struct {
16771677
ResultProcessor base;
16781678
HybridScoringContext *hybridScoringCtx; // Store by pointer - RPHybridMerger is responsible for freeing it
1679-
ResultProcessor **upstreams; // Dynamic array of upstream processors
1680-
size_t numUpstreams; // Number of upstream processors
1681-
dict *hybridResults; // keyPtr -> HybridSearchResult mapping
1682-
dictIterator *iterator; // Iterator for yielding results
1683-
const RLookupKey *scoreKey; // Key for writing score as field when QEXEC_F_SEND_SCORES_AS_FIELD is set
1684-
RPStatus* upstreamReturnCodes; // Final return codes from each upstream
1679+
ResultProcessor **upstreams; // Dynamic array of upstream processors
1680+
size_t numUpstreams; // Number of upstream processors
1681+
dict *hybridResults; // keyPtr -> HybridSearchResult mapping
1682+
dictIterator *iterator; // Iterator for yielding results
1683+
const RLookupKey *scoreKey; // Key for writing score as field when YIELD_SCORE_AS is specified
1684+
const RLookupKey *docKey; // Key for reading document key when dmd is not available
1685+
RPStatus* upstreamReturnCodes; // Final return codes from each upstream
16851686
HybridLookupContext *lookupCtx; // Lookup context for field merging
1687+
16861688
} RPHybridMerger;
16871689

16881690
/* Generic helper function to check if any upstream has a specific return code */
@@ -1712,24 +1714,38 @@ static inline bool RPHybridMerger_Error(const RPHybridMerger *self) {
17121714
* @param numUpstreams - the number of upstreams
17131715
* @param score - used to override the result's score
17141716
*/
1715-
static void StoreUpstreamResult(SearchResult *r, dict *hybridResults, int upstreamIndex, size_t numUpstreams, double score) {
1716-
const char *keyPtr = SearchResult_GetDocumentMetadata(r)->keyPtr;
1717+
static bool hybridMergerStoreUpstreamResult(SearchResult *r, dict *hybridResults, const RLookupKey *docKey, int upstreamIndex, size_t numUpstreams, double score) {
1718+
// Single shard case - use dmd->keyPtr
1719+
const RSDocumentMetadata *dmd = SearchResult_GetDocumentMetadata(r);
1720+
const char *keyPtr = dmd ? dmd->keyPtr : NULL;
1721+
// Coordinator case - no dmd - use docKey in rlookup
1722+
const bool fallbackToLookup = !keyPtr && docKey;
1723+
if (fallbackToLookup) {
1724+
RSValue *docKeyValue = RLookup_GetItem(docKey, &r->rowdata);
1725+
if (docKeyValue != NULL) {
1726+
keyPtr = RSValue_StringPtrLen(docKeyValue, NULL);
1727+
}
1728+
}
1729+
if (!keyPtr) {
1730+
return false;
1731+
}
17171732

1718-
// Check if we've seen this document before
1719-
HybridSearchResult *hybridResult = (HybridSearchResult*)dictFetchValue(hybridResults, keyPtr);
1733+
// Check if we've seen this document before
1734+
HybridSearchResult *hybridResult = (HybridSearchResult*)dictFetchValue(hybridResults, keyPtr);
17201735

1721-
if (!hybridResult) {
1722-
// First time seeing this document - create new hybrid result
1723-
hybridResult = HybridSearchResult_New(numUpstreams);
1724-
dictAdd(hybridResults, (void*)keyPtr, hybridResult);
1725-
}
1736+
if (!hybridResult) {
1737+
// First time seeing this document - create new hybrid result
1738+
hybridResult = HybridSearchResult_New(numUpstreams);
1739+
dictAdd(hybridResults, (void*)keyPtr, hybridResult);
1740+
}
17261741

17271742
SearchResult_SetScore(r, score);
17281743
HybridSearchResult_StoreResult(hybridResult, r, upstreamIndex);
1744+
return true;
17291745
}
17301746

17311747
/* Helper function to consume results from a single upstream */
1732-
static int ConsumeFromUpstream(RPHybridMerger *self, size_t maxResults, ResultProcessor *upstream, int upstreamIndex) {
1748+
static int hybridMergerConsumeFromUpstream(RPHybridMerger *self, size_t maxResults, ResultProcessor *upstream, int upstreamIndex) {
17331749
size_t consumed = 0;
17341750
int rc = RS_RESULT_OK;
17351751
SearchResult *r = rm_calloc(1, sizeof(*r));
@@ -1739,8 +1755,12 @@ static inline bool RPHybridMerger_Error(const RPHybridMerger *self) {
17391755
if (self->hybridScoringCtx->scoringType == HYBRID_SCORING_RRF) {
17401756
score = consumed;
17411757
}
1742-
StoreUpstreamResult(r, self->hybridResults, upstreamIndex, self->numUpstreams, score);
1743-
r = rm_calloc(1, sizeof(*r));
1758+
if (hybridMergerStoreUpstreamResult(r, self->hybridResults, self->docKey, upstreamIndex, self->numUpstreams, score)) {
1759+
r = rm_calloc(1, sizeof(*r));
1760+
} else {
1761+
SearchResult_Clear(r);
1762+
--consumed; // avoid wrong rank in RRF
1763+
}
17441764
}
17451765
RLookup_AddKeysFrom(self->lookupCtx->sourceLookups[upstreamIndex], self->lookupCtx->tailLookup, RLOOKUP_F_NOFLAGS);
17461766
rm_free(r);
@@ -1800,7 +1820,7 @@ static inline bool RPHybridMerger_Error(const RPHybridMerger *self) {
18001820
} else {
18011821
window = self->hybridScoringCtx->linearCtx.window;
18021822
}
1803-
int rc = ConsumeFromUpstream(self, window, self->upstreams[i], i);
1823+
int rc = hybridMergerConsumeFromUpstream(self, window, self->upstreams[i], i);
18041824

18051825
if (rc == RS_RESULT_DEPLETING) {
18061826
// Upstream is still active but not ready to provide results. Skip to the next.
@@ -1880,6 +1900,7 @@ static inline bool RPHybridMerger_Error(const RPHybridMerger *self) {
18801900
ResultProcessor *RPHybridMerger_New(HybridScoringContext *hybridScoringCtx,
18811901
ResultProcessor **upstreams,
18821902
size_t numUpstreams,
1903+
const RLookupKey *docKey,
18831904
const RLookupKey *scoreKey,
18841905
RPStatus *subqueriesReturnCodes,
18851906
HybridLookupContext *lookupCtx) {
@@ -1892,7 +1913,9 @@ ResultProcessor *RPHybridMerger_New(HybridScoringContext *hybridScoringCtx,
18921913
RS_ASSERT(hybridScoringCtx);
18931914
ret->hybridScoringCtx = hybridScoringCtx;
18941915

1895-
// Store the scoreKey for writing scores as fields when QEXEC_F_SEND_SCORES_AS_FIELD is set
1916+
// Store the scoreKey for reading document keys from rlookup
1917+
ret->docKey = docKey;
1918+
// Store the scoreKey for writing scores as fields when YIELD_SCORE_AS is specified
18961919
ret->scoreKey = scoreKey;
18971920

18981921
// Store reference to the hybrid request's subqueries return codes array

src/result_processor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ StrongRef DepleterSync_New(unsigned int num_depleters, bool take_index_lock);
307307
ResultProcessor *RPHybridMerger_New(HybridScoringContext *hybridScoringCtx,
308308
ResultProcessor **upstreams,
309309
size_t numUpstreams,
310+
const RLookupKey *docKey,
310311
const RLookupKey *scoreKey,
311312
RPStatus *subqueriesReturnCodes,
312313
HybridLookupContext *lookupCtx);

src/util/arg_parser.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ ArgParseResult ArgParser_Parse(ArgParser *parser) {
466466
// Find the next unparsed positional argument
467467
ArgDefinition *pos_def = NULL;
468468
for (uint16_t pos = 1; pos <= MAX_POSITIONAL_ARGS; pos++) { // reasonable limit
469-
ArgDefinition *candidate = find_positional_definition(parser, pos, NULL);
469+
ArgDefinition *candidate = find_positional_definition(parser, pos, arg_name);
470470
if (!candidate) break;
471471

472472
if (!candidate->parsed) {

tests/cpptests/test_cpp_hybridmerger.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ ResultProcessor* CreateLinearHybridMerger(ResultProcessor **upstreams, size_t nu
190190
// Create dummy return codes array for tests that don't need to track return codes
191191
static RPStatus dummyReturnCodes[8] = {RS_RESULT_OK}; // Static array, supports up to 8 upstreams for tests
192192

193-
return RPHybridMerger_New(hybridScoringCtx, upstreams, numUpstreams, NULL, dummyReturnCodes, lookupCtx);
193+
return RPHybridMerger_New(hybridScoringCtx, upstreams, numUpstreams, NULL, NULL, dummyReturnCodes, lookupCtx);
194194
}
195195

196196
// Helper function to create hybrid merger with RRF scoring
@@ -201,7 +201,7 @@ ResultProcessor* CreateRRFHybridMerger(ResultProcessor **upstreams, size_t numUp
201201
// Create dummy return codes array for tests that don't need to track return codes
202202
static RPStatus dummyReturnCodes[8] = {RS_RESULT_OK}; // Static array, supports up to 8 upstreams for tests
203203

204-
return RPHybridMerger_New(hybridScoringCtx, upstreams, numUpstreams, NULL, dummyReturnCodes, lookupCtx);
204+
return RPHybridMerger_New(hybridScoringCtx, upstreams, numUpstreams, NULL, NULL, dummyReturnCodes, lookupCtx);
205205
}
206206

207207

@@ -1424,7 +1424,7 @@ TEST_F(HybridMergerTest, testUpstreamReturnCodes) {
14241424
// Create dummy lookup context
14251425
HybridLookupContext *lookupCtx = CreateDummyLookupContext(3);
14261426

1427-
ResultProcessor *hybridMerger = RPHybridMerger_New(hybridScoringCtx, upstreams, 3, NULL, returnCodes, lookupCtx);
1427+
ResultProcessor *hybridMerger = RPHybridMerger_New(hybridScoringCtx, upstreams, 3, NULL, NULL, returnCodes, lookupCtx);
14281428

14291429
// Process results - this should capture the return codes
14301430
SearchResult r = {0};

tests/pytests/test_hybrid_filter.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ def test_hybrid_filter_behavior():
6161
'FILTER', '@category:{"fruit"}'
6262
)
6363
results, _ = get_results_from_hybrid_response(response)
64-
# This should return all with fruit from vector subquery (doc:1, doc:2) and all with green text (doc:3)
6564
env.assertEqual(set(results.keys()), {"doc:1", "doc:2", "doc:3"})
6665

6766
response = env.cmd(
@@ -71,7 +70,6 @@ def test_hybrid_filter_behavior():
7170
'FILTER', '@category:{"fruit"}', "COMBINE", "RRF", "2", "CONSTANT", "30",
7271
)
7372
results, _ = get_results_from_hybrid_response(response)
74-
# This should filter as before, just an extra combine
7573
env.assertEqual(set(results.keys()), {"doc:1", "doc:2", "doc:3"})
7674

7775
response = env.cmd(
@@ -81,7 +79,6 @@ def test_hybrid_filter_behavior():
8179
"COMBINE", "RRF", "2", "CONSTANT", "30", "LOAD", 2, "__key", "category", "FILTER", "@category==\"fruit\"",
8280
)
8381
results, _ = get_results_from_hybrid_response(response)
84-
# This should filter as post processing.
8582
env.assertEqual(set(results.keys()), {"doc:1", "doc:2"})
8683

8784
response = env.cmd(
@@ -91,7 +88,6 @@ def test_hybrid_filter_behavior():
9188
'FILTER', '@category:{"vegetable"}', "COMBINE", "RRF", "2", "CONSTANT", "30", "LOAD", 2, "__key", "category", "FILTER", "@category==\"fruit\"",
9289
)
9390
results, _ = get_results_from_hybrid_response(response)
94-
# This should filter as before, just an extra combine
9591
env.assertEqual(results, {})
9692

9793
response = env.cmd(
@@ -101,16 +97,31 @@ def test_hybrid_filter_behavior():
10197
'FILTER', '@category:{"vegetable"}', "LOAD", 2, "__key", "category", "FILTER", "@category==\"clothing\"",
10298
)
10399
results, _ = get_results_from_hybrid_response(response)
104-
# This should filter as before, just an extra combine
105100
env.assertEqual(set(results.keys()), {"doc:3"})
106101

107-
# post-query FILTER immediately after VSIM FILTER
102+
response = env.cmd(
103+
'FT.HYBRID', 'filter_idx',
104+
'SEARCH', '@text:(green)',
105+
'VSIM', '@vector', query_vector,
106+
'FILTER', '@category:{"vegetable"}',
107+
)
108+
results, _ = get_results_from_hybrid_response(response)
109+
env.assertEqual(set(results.keys()), {"doc:3", "doc:4"})
110+
111+
response = env.cmd(
112+
'FT.HYBRID', 'filter_idx',
113+
'SEARCH', '@text:(green)',
114+
'VSIM', '@vector', query_vector,
115+
'FILTER', '@category:{"vegetable"}', "FILTER", "@__key!=\"doc:3\"",
116+
)
117+
results, _ = get_results_from_hybrid_response(response)
118+
env.assertEqual(set(results.keys()), {"doc:4"})
119+
108120
response = env.cmd(
109121
'FT.HYBRID', 'filter_idx',
110122
'SEARCH', '@text:(green)',
111123
'VSIM', '@vector', query_vector,
112124
'FILTER', '@category:{"vegetable"}', "FILTER", "@__key==\"doc:3\"",
113125
)
114126
results, _ = get_results_from_hybrid_response(response)
115-
# This should filter as before, just an extra combine
116127
env.assertEqual(set(results.keys()), {"doc:3"})

0 commit comments

Comments
 (0)