@@ -1676,13 +1676,15 @@ dictType dictTypeHybridSearchResult = {
16761676 typedef struct {
16771677 ResultProcessor base ;
16781678 HybridScoringContext * hybridScoringCtx ; // Store by pointer - RPHybridMerger is responsible for freeing it
1679- ResultProcessor * * upstreams ; // Dynamic array of upstream processors
1680- size_t numUpstreams ; // Number of upstream processors
1681- dict * hybridResults ; // keyPtr -> HybridSearchResult mapping
1682- dictIterator * iterator ; // Iterator for yielding results
1683- const RLookupKey * scoreKey ; // Key for writing score as field when QEXEC_F_SEND_SCORES_AS_FIELD is set
1684- RPStatus * upstreamReturnCodes ; // Final return codes from each upstream
1679+ ResultProcessor * * upstreams ; // Dynamic array of upstream processors
1680+ size_t numUpstreams ; // Number of upstream processors
1681+ dict * hybridResults ; // keyPtr -> HybridSearchResult mapping
1682+ dictIterator * iterator ; // Iterator for yielding results
1683+ const RLookupKey * scoreKey ; // Key for writing score as field when YIELD_SCORE_AS is specified
1684+ const RLookupKey * docKey ; // Key for reading document key when dmd is not available
1685+ RPStatus * upstreamReturnCodes ; // Final return codes from each upstream
16851686 HybridLookupContext * lookupCtx ; // Lookup context for field merging
1687+
16861688} RPHybridMerger ;
16871689
16881690/* Generic helper function to check if any upstream has a specific return code */
@@ -1712,24 +1714,38 @@ static inline bool RPHybridMerger_Error(const RPHybridMerger *self) {
17121714 * @param numUpstreams - the number of upstreams
17131715 * @param score - used to override the result's score
17141716 */
1715- static void StoreUpstreamResult (SearchResult * r , dict * hybridResults , int upstreamIndex , size_t numUpstreams , double score ) {
1716- const char * keyPtr = SearchResult_GetDocumentMetadata (r )-> keyPtr ;
1717+ static bool hybridMergerStoreUpstreamResult (SearchResult * r , dict * hybridResults , const RLookupKey * docKey , int upstreamIndex , size_t numUpstreams , double score ) {
1718+ // Single shard case - use dmd->keyPtr
1719+ const RSDocumentMetadata * dmd = SearchResult_GetDocumentMetadata (r );
1720+ const char * keyPtr = dmd ? dmd -> keyPtr : NULL ;
1721+ // Coordinator case - no dmd - use docKey in rlookup
1722+ const bool fallbackToLookup = !keyPtr && docKey ;
1723+ if (fallbackToLookup ) {
1724+ RSValue * docKeyValue = RLookup_GetItem (docKey , & r -> rowdata );
1725+ if (docKeyValue != NULL ) {
1726+ keyPtr = RSValue_StringPtrLen (docKeyValue , NULL );
1727+ }
1728+ }
1729+ if (!keyPtr ) {
1730+ return false;
1731+ }
17171732
1718- // Check if we've seen this document before
1719- HybridSearchResult * hybridResult = (HybridSearchResult * )dictFetchValue (hybridResults , keyPtr );
1733+ // Check if we've seen this document before
1734+ HybridSearchResult * hybridResult = (HybridSearchResult * )dictFetchValue (hybridResults , keyPtr );
17201735
1721- if (!hybridResult ) {
1722- // First time seeing this document - create new hybrid result
1723- hybridResult = HybridSearchResult_New (numUpstreams );
1724- dictAdd (hybridResults , (void * )keyPtr , hybridResult );
1725- }
1736+ if (!hybridResult ) {
1737+ // First time seeing this document - create new hybrid result
1738+ hybridResult = HybridSearchResult_New (numUpstreams );
1739+ dictAdd (hybridResults , (void * )keyPtr , hybridResult );
1740+ }
17261741
17271742 SearchResult_SetScore (r , score );
17281743 HybridSearchResult_StoreResult (hybridResult , r , upstreamIndex );
1744+ return true;
17291745 }
17301746
17311747 /* Helper function to consume results from a single upstream */
1732- static int ConsumeFromUpstream (RPHybridMerger * self , size_t maxResults , ResultProcessor * upstream , int upstreamIndex ) {
1748+ static int hybridMergerConsumeFromUpstream (RPHybridMerger * self , size_t maxResults , ResultProcessor * upstream , int upstreamIndex ) {
17331749 size_t consumed = 0 ;
17341750 int rc = RS_RESULT_OK ;
17351751 SearchResult * r = rm_calloc (1 , sizeof (* r ));
@@ -1739,8 +1755,12 @@ static inline bool RPHybridMerger_Error(const RPHybridMerger *self) {
17391755 if (self -> hybridScoringCtx -> scoringType == HYBRID_SCORING_RRF ) {
17401756 score = consumed ;
17411757 }
1742- StoreUpstreamResult (r , self -> hybridResults , upstreamIndex , self -> numUpstreams , score );
1743- r = rm_calloc (1 , sizeof (* r ));
1758+ if (hybridMergerStoreUpstreamResult (r , self -> hybridResults , self -> docKey , upstreamIndex , self -> numUpstreams , score )) {
1759+ r = rm_calloc (1 , sizeof (* r ));
1760+ } else {
1761+ SearchResult_Clear (r );
1762+ -- consumed ; // avoid wrong rank in RRF
1763+ }
17441764 }
17451765 RLookup_AddKeysFrom (self -> lookupCtx -> sourceLookups [upstreamIndex ], self -> lookupCtx -> tailLookup , RLOOKUP_F_NOFLAGS );
17461766 rm_free (r );
@@ -1800,7 +1820,7 @@ static inline bool RPHybridMerger_Error(const RPHybridMerger *self) {
18001820 } else {
18011821 window = self -> hybridScoringCtx -> linearCtx .window ;
18021822 }
1803- int rc = ConsumeFromUpstream (self , window , self -> upstreams [i ], i );
1823+ int rc = hybridMergerConsumeFromUpstream (self , window , self -> upstreams [i ], i );
18041824
18051825 if (rc == RS_RESULT_DEPLETING ) {
18061826 // Upstream is still active but not ready to provide results. Skip to the next.
@@ -1880,6 +1900,7 @@ static inline bool RPHybridMerger_Error(const RPHybridMerger *self) {
18801900ResultProcessor * RPHybridMerger_New (HybridScoringContext * hybridScoringCtx ,
18811901 ResultProcessor * * upstreams ,
18821902 size_t numUpstreams ,
1903+ const RLookupKey * docKey ,
18831904 const RLookupKey * scoreKey ,
18841905 RPStatus * subqueriesReturnCodes ,
18851906 HybridLookupContext * lookupCtx ) {
@@ -1892,7 +1913,9 @@ ResultProcessor *RPHybridMerger_New(HybridScoringContext *hybridScoringCtx,
18921913 RS_ASSERT (hybridScoringCtx );
18931914 ret -> hybridScoringCtx = hybridScoringCtx ;
18941915
1895- // Store the scoreKey for writing scores as fields when QEXEC_F_SEND_SCORES_AS_FIELD is set
1916+ // Store the scoreKey for reading document keys from rlookup
1917+ ret -> docKey = docKey ;
1918+ // Store the scoreKey for writing scores as fields when YIELD_SCORE_AS is specified
18961919 ret -> scoreKey = scoreKey ;
18971920
18981921 // Store reference to the hybrid request's subqueries return codes array
0 commit comments