1515typedef struct {
1616 arrayof (RedisModuleSlotRange ) slotRanges ;
1717 MRClusterNode node ;
18+ bool isMaster ;
1819} RLShard ;
1920
2021static void RLShard_Free (RLShard * sh ) {
@@ -81,18 +82,22 @@ static void MRTopology_AddRLShard(MRClusterTopology *t, RLShard *sh) {
8182 } \
8283 })
8384
85+ #define STR_MATCH (str , len , lit ) (sizeof(lit) - 1 == len && strcasecmp(str, lit) == 0)
86+
8487MRClusterTopology * RedisEnterprise_ParseTopology (RedisModuleCtx * ctx , RedisModuleString * * argv ,
8588 int argc , uint32_t * my_shard_idx ) {
8689 ArgsCursor ac ; // Name is important for error macros, same goes for `ctx`
8790 ArgsCursor_InitRString (& ac , argv , argc );
8891 AC_Advance (& ac ); // Skip command name
92+ size_t myID_offset ;
8993 const char * myID = NULL ; // Mandatory. No default.
9094 uint32_t numRanges = 0 ; // Mandatory. No default.
9195 uint32_t numSlots = 16384 ; // Default.
9296
9397 // Parse general arguments. No allocation is done here, so we can just return on error
9498 while (!AC_IsAtEnd (& ac )) {
9599 if (AC_AdvanceIfMatch (& ac , "MYID" )) {
100+ myID_offset = ac .offset ;
96101 myID = AC_GetStringNC (& ac , NULL ); // Verified after breaking out of loop
97102 } else if (AC_AdvanceIfMatch (& ac , "HASHFUNC" )) {
98103 const char * hashFuncStr = AC_GetStringNC (& ac , NULL );
@@ -156,7 +161,6 @@ MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModul
156161 const char * idstr = RedisModule_StringPtrLen (shardIDStr , & len );
157162 sh -> node .id = rm_strndup (idstr , len );
158163
159- bool is_master = false;
160164 while (!AC_IsAtEnd (& ac )) {
161165 if (AC_AdvanceIfMatch (& ac , "SLOTRANGE" )) {
162166 if (array_len (sh -> slotRanges ) > 0 ) {
@@ -196,39 +200,27 @@ MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModul
196200
197201 } else if (AC_AdvanceIfMatch (& ac , "UNIXADDR" )) {
198202 /* Optional UNIXADDR <unix_addr> */
203+ if (sh -> node .endpoint .unixSock ) {
204+ ERROR_FMT ("Multiple UNIXADDR specified for shard `%s`" , sh -> node .id );
205+ goto error ;
206+ }
199207 size_t len ;
200208 const char * unixSock ;
201- if (!(unixSock = AC_GetStringNC (& ac , & len ))) {
209+ if (AC_GetString (& ac , & unixSock , & len , AC_F_NOADVANCE ) != AC_OK ||
210+ STR_MATCH (unixSock , len , "MASTER" ) || // Avoid consuming MASTER flag argument
211+ STR_MATCH (unixSock , len , "SHARD" )) { // Avoid consuming next SHARD marker argument
202212 ERROR_MISSING ("UNIXADDR" );
203213 goto error ;
204214 }
205- if (sh -> node .endpoint .unixSock ) {
206- ERROR_FMT ("Multiple UNIXADDR specified for shard `%s`" , sh -> node .id );
207- goto error ;
208- }
209215 sh -> node .endpoint .unixSock = rm_strndup (unixSock , len );
210-
216+ AC_Advance ( & ac );
211217 } else if (AC_AdvanceIfMatch (& ac , "MASTER" )) {
212- is_master = true;
218+ sh -> isMaster = true;
213219 } else {
214220 break ;
215221 }
216222 }
217223
218- // We don't care for replicas using this command anymore
219- if (!is_master ) {
220- RLShard_Free (sh );
221- sh = NULL ;
222- continue ;
223- }
224-
225- // Ignore shards with no slot ranges (like replicas)
226- if (array_len (sh -> slotRanges ) == 0 ) {
227- RLShard_Free (sh );
228- sh = NULL ;
229- continue ;
230- }
231-
232224 dictEntry * entry = dictAddOrFind (shards , shardIDStr );
233225 if (!dictGetVal (entry )) {
234226 // New shard
@@ -270,15 +262,18 @@ MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModul
270262 goto error ;
271263 }
272264 }
273- RS_ASSERT (array_len (sh -> slotRanges ) == 1 );
274- // Verify slot range starts past existing ones
275- if (array_tail (existing_shard -> slotRanges ).end + 1 >= sh -> slotRanges [0 ].start ) {
276- ERROR_FMT ("SLOTRANGE out of order for shard `%s`" , sh -> node .id );
277- goto error ;
278- }
279265
280- // Append new slot range
281- array_ensure_append_1 (existing_shard -> slotRanges , sh -> slotRanges [0 ]);
266+ RS_ASSERT (array_len (sh -> slotRanges ) <= 1 );
267+ if (array_len (sh -> slotRanges ) == 1 ) {
268+ // Verify slot range starts past existing ones
269+ if (array_len (existing_shard -> slotRanges ) > 0 && array_tail (existing_shard -> slotRanges ).end + 1 >= sh -> slotRanges [0 ].start ) {
270+ ERROR_FMT ("SLOTRANGE out of order for shard `%s`" , sh -> node .id );
271+ goto error ;
272+ }
273+
274+ // Append new slot range
275+ array_ensure_append_1 (existing_shard -> slotRanges , sh -> slotRanges [0 ]);
276+ }
282277
283278 // Discard parsed shard
284279 RLShard_Free (sh );
@@ -294,13 +289,17 @@ MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModul
294289 }
295290
296291 // Now, build the topology.
297- // 1. All shards in the dict are valid masters
292+ // 1. All shards in the dict are valid
298293 // 2. We can identify my shard by myID
299294 topo = MR_NewTopology (dictSize (shards ));
300295 dictIterator * iter = dictGetIterator (shards );
301296 dictEntry * de ;
302297 while ((de = dictNext (iter )) != NULL ) {
303- MRTopology_AddRLShard (topo , dictGetVal (de ));
298+ RLShard * sh = dictGetVal (de );
299+ // Only add master shards with slots
300+ if (sh -> isMaster && array_len (sh -> slotRanges ) > 0 ) {
301+ MRTopology_AddRLShard (topo , sh );
302+ }
304303 }
305304 dictReleaseIterator (iter );
306305
@@ -315,10 +314,14 @@ MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModul
315314 }
316315
317316 if (* my_shard_idx == UINT32_MAX ) {
318- ERROR_FMT ("MYID `%s` does not correspond to any shard" , myID );
319- MRClusterTopology_Free (topo );
320- topo = NULL ;
321- goto error ;
317+ // if MyID corresponds to some shard in the dict, this is NOT an error:
318+ // It means the local node is not part of the topology we store (e.g., it has no slot, or is a replica)
319+ if (dictFind (shards , argv [myID_offset ]) == NULL ) {
320+ ERROR_FMT ("MYID `%s` does not correspond to any shard" , myID );
321+ MRClusterTopology_Free (topo );
322+ topo = NULL ;
323+ goto error ;
324+ }
322325 }
323326
324327error : // Also the normal exit point
0 commit comments