Skip to content

Commit c349031

Browse files
GuyAv46github-actions[bot]
authored andcommitted
Fix edge case in clusterset - [MOD-13562] (#8068)
* collect all shards (including replicas and no slots) and filter only at the end * Add API description * Align usage * add a test * avoid string arg consuming of standalone flag params * fix unit tests * add another test * align and add flow tests * add another case (cherry picked from commit a2279bd)
1 parent b6429aa commit c349031

5 files changed

Lines changed: 170 additions & 46 deletions

File tree

src/coord/rmr/redise.c

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
typedef struct {
1616
arrayof(RedisModuleSlotRange) slotRanges;
1717
MRClusterNode node;
18+
bool isMaster;
1819
} RLShard;
1920

2021
static void RLShard_Free(RLShard *sh) {
@@ -81,18 +82,22 @@ static void MRTopology_AddRLShard(MRClusterTopology *t, RLShard *sh) {
8182
} \
8283
})
8384

85+
#define STR_MATCH(str, len, lit) (sizeof(lit) - 1 == len && strcasecmp(str, lit) == 0)
86+
8487
MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModuleString **argv,
8588
int argc, uint32_t *my_shard_idx) {
8689
ArgsCursor ac; // Name is important for error macros, same goes for `ctx`
8790
ArgsCursor_InitRString(&ac, argv, argc);
8891
AC_Advance(&ac); // Skip command name
92+
size_t myID_offset;
8993
const char *myID = NULL; // Mandatory. No default.
9094
uint32_t numRanges = 0; // Mandatory. No default.
9195
uint32_t numSlots = 16384; // Default.
9296

9397
// Parse general arguments. No allocation is done here, so we can just return on error
9498
while (!AC_IsAtEnd(&ac)) {
9599
if (AC_AdvanceIfMatch(&ac, "MYID")) {
100+
myID_offset = ac.offset;
96101
myID = AC_GetStringNC(&ac, NULL); // Verified after breaking out of loop
97102
} else if (AC_AdvanceIfMatch(&ac, "HASHFUNC")) {
98103
const char *hashFuncStr = AC_GetStringNC(&ac, NULL);
@@ -156,7 +161,6 @@ MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModul
156161
const char *idstr = RedisModule_StringPtrLen(shardIDStr, &len);
157162
sh->node.id = rm_strndup(idstr, len);
158163

159-
bool is_master = false;
160164
while (!AC_IsAtEnd(&ac)) {
161165
if (AC_AdvanceIfMatch(&ac, "SLOTRANGE")) {
162166
if (array_len(sh->slotRanges) > 0) {
@@ -196,39 +200,27 @@ MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModul
196200

197201
} else if (AC_AdvanceIfMatch(&ac, "UNIXADDR")) {
198202
/* Optional UNIXADDR <unix_addr> */
203+
if (sh->node.endpoint.unixSock) {
204+
ERROR_FMT("Multiple UNIXADDR specified for shard `%s`", sh->node.id);
205+
goto error;
206+
}
199207
size_t len;
200208
const char *unixSock;
201-
if (!(unixSock = AC_GetStringNC(&ac, &len))) {
209+
if (AC_GetString(&ac, &unixSock, &len, AC_F_NOADVANCE) != AC_OK ||
210+
STR_MATCH(unixSock, len, "MASTER") || // Avoid consuming MASTER flag argument
211+
STR_MATCH(unixSock, len, "SHARD")) { // Avoid consuming next SHARD marker argument
202212
ERROR_MISSING("UNIXADDR");
203213
goto error;
204214
}
205-
if (sh->node.endpoint.unixSock) {
206-
ERROR_FMT("Multiple UNIXADDR specified for shard `%s`", sh->node.id);
207-
goto error;
208-
}
209215
sh->node.endpoint.unixSock = rm_strndup(unixSock, len);
210-
216+
AC_Advance(&ac);
211217
} else if (AC_AdvanceIfMatch(&ac, "MASTER")) {
212-
is_master = true;
218+
sh->isMaster = true;
213219
} else {
214220
break;
215221
}
216222
}
217223

218-
// We don't care for replicas using this command anymore
219-
if (!is_master) {
220-
RLShard_Free(sh);
221-
sh = NULL;
222-
continue;
223-
}
224-
225-
// Ignore shards with no slot ranges (like replicas)
226-
if (array_len(sh->slotRanges) == 0) {
227-
RLShard_Free(sh);
228-
sh = NULL;
229-
continue;
230-
}
231-
232224
dictEntry *entry = dictAddOrFind(shards, shardIDStr);
233225
if (!dictGetVal(entry)) {
234226
// New shard
@@ -270,15 +262,18 @@ MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModul
270262
goto error;
271263
}
272264
}
273-
RS_ASSERT(array_len(sh->slotRanges) == 1);
274-
// Verify slot range starts past existing ones
275-
if (array_tail(existing_shard->slotRanges).end + 1 >= sh->slotRanges[0].start) {
276-
ERROR_FMT("SLOTRANGE out of order for shard `%s`", sh->node.id);
277-
goto error;
278-
}
279265

280-
// Append new slot range
281-
array_ensure_append_1(existing_shard->slotRanges, sh->slotRanges[0]);
266+
RS_ASSERT(array_len(sh->slotRanges) <= 1);
267+
if (array_len(sh->slotRanges) == 1) {
268+
// Verify slot range starts past existing ones
269+
if (array_len(existing_shard->slotRanges) > 0 && array_tail(existing_shard->slotRanges).end + 1 >= sh->slotRanges[0].start) {
270+
ERROR_FMT("SLOTRANGE out of order for shard `%s`", sh->node.id);
271+
goto error;
272+
}
273+
274+
// Append new slot range
275+
array_ensure_append_1(existing_shard->slotRanges, sh->slotRanges[0]);
276+
}
282277

283278
// Discard parsed shard
284279
RLShard_Free(sh);
@@ -294,13 +289,17 @@ MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModul
294289
}
295290

296291
// Now, build the topology.
297-
// 1. All shards in the dict are valid masters
292+
// 1. All shards in the dict are valid
298293
// 2. We can identify my shard by myID
299294
topo = MR_NewTopology(dictSize(shards));
300295
dictIterator *iter = dictGetIterator(shards);
301296
dictEntry *de;
302297
while ((de = dictNext(iter)) != NULL) {
303-
MRTopology_AddRLShard(topo, dictGetVal(de));
298+
RLShard *sh = dictGetVal(de);
299+
// Only add master shards with slots
300+
if (sh->isMaster && array_len(sh->slotRanges) > 0) {
301+
MRTopology_AddRLShard(topo, sh);
302+
}
304303
}
305304
dictReleaseIterator(iter);
306305

@@ -315,10 +314,14 @@ MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModul
315314
}
316315

317316
if (*my_shard_idx == UINT32_MAX) {
318-
ERROR_FMT("MYID `%s` does not correspond to any shard", myID);
319-
MRClusterTopology_Free(topo);
320-
topo = NULL;
321-
goto error;
317+
// if MyID corresponds to some shard in the dict, this is NOT an error:
318+
// It means the local node is not part of the topology we store (e.g., it has no slot, or is a replica)
319+
if (dictFind(shards, argv[myID_offset]) == NULL) {
320+
ERROR_FMT("MYID `%s` does not correspond to any shard", myID);
321+
MRClusterTopology_Free(topo);
322+
topo = NULL;
323+
goto error;
324+
}
322325
}
323326

324327
error: // Also the normal exit point

src/coord/rmr/redise.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
extern "C" {
1616
#endif
1717

18+
/* Parse the cluster topology from the given arguments.
19+
* On success, returns the parsed topology. On failure, replies with an error
20+
* using the provided context and returns NULL.
21+
*
22+
* The `my_shard_idx` output parameter is set to the index of the shard
23+
* corresponding to MYID, or UINT32_MAX if MYID does not correspond to any shard
24+
* in the topology.
25+
*/
1826
MRClusterTopology *RedisEnterprise_ParseTopology(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, uint32_t *my_shard_idx);
1927

2028
#ifdef __cplusplus

src/module.c

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4022,15 +4022,23 @@ int SetClusterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
40224022

40234023
RedisModule_Log(ctx, "notice", "Received new cluster topology with %u shards (%s)", topo->numShards, ranges_info);
40244024

4025-
// Take a reference to our own shard slot ranges (MR_UpdateTopology won't consume it)
4026-
RS_ASSERT(my_shard_idx < topo->numShards);
4027-
const RedisModuleSlotRangeArray *my_slots = topo->shards[my_shard_idx].slotRanges;
4025+
if (my_shard_idx != UINT32_MAX) {
4026+
// Take a reference to our own shard slot ranges (MR_UpdateTopology won't consume it)
4027+
RS_ASSERT(my_shard_idx < topo->numShards);
4028+
const RedisModuleSlotRangeArray *my_slots = topo->shards[my_shard_idx].slotRanges;
40284029

4029-
// Store the local shard id
4030-
MR_SetLocalNodeId(topo->shards[my_shard_idx].node.id);
4030+
// Store the local shard id
4031+
MR_SetLocalNodeId(topo->shards[my_shard_idx].node.id);
40314032

4032-
// send the topology to the cluster
4033-
MR_UpdateTopology(topo, my_slots);
4033+
// send the topology to the cluster
4034+
MR_UpdateTopology(topo, my_slots);
4035+
4036+
} else {
4037+
// Valid topology but this node is not part of it.
4038+
// We cannot pass NULL as local slots, so we pass an empty slot array.
4039+
static const RedisModuleSlotRangeArray empty_slots = {0, {{0, 0}}};
4040+
MR_UpdateTopology(topo, &empty_slots);
4041+
}
40344042
return RedisModule_ReplyWithSimpleString(ctx, "OK");
40354043
}
40364044

tests/cpptests/coord_tests/test_cpp_clusterset.cpp

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -746,7 +746,7 @@ TEST_F(ClusterSetTest, Error_MissingUNIXADDRValue) {
746746
MRClusterTopology *topo = RedisEnterprise_ParseTopology(ctx, argv, argv.size(), &my_shard_idx);
747747

748748
EXPECT_EQ(topo, nullptr);
749-
EXPECT_EQ(RMCK_GetLastError(ctx), "MYID `shard1` does not correspond to any shard at offset 14");
749+
EXPECT_EQ(RMCK_GetLastError(ctx), "Missing value for UNIXADDR at offset 13");
750750

751751
}
752752

@@ -806,7 +806,7 @@ TEST_F(ClusterSetTest, Error_MultipleUNIXADDR_SameBlock) {
806806
MRClusterTopology *topo = RedisEnterprise_ParseTopology(ctx, argv, argv.size(), &my_shard_idx);
807807

808808
EXPECT_EQ(topo, nullptr);
809-
EXPECT_EQ(RMCK_GetLastError(ctx), "Multiple UNIXADDR specified for shard `shard1` at offset 16");
809+
EXPECT_EQ(RMCK_GetLastError(ctx), "Multiple UNIXADDR specified for shard `shard1` at offset 15");
810810

811811
}
812812

@@ -994,3 +994,56 @@ TEST_F(ClusterSetTest, EdgeCase_ManyShards) {
994994

995995
MRClusterTopology_Free(topo);
996996
}
997+
998+
TEST_F(ClusterSetTest, EdgeCase_LocalShardEmpty) {
999+
std::vector<std::string> args = {
1000+
"search.CLUSTERSET",
1001+
"MYID", "local_shard",
1002+
"RANGES", "3",
1003+
"SHARD", "local_shard", "ADDR", "127.0.0.3:6379", "MASTER", // No SLOTRANGE - empty shard
1004+
"SHARD", "shard1", "SLOTRANGE", "0", "8191", "ADDR", "127.0.0.1:6379", "MASTER",
1005+
"SHARD", "shard2", "SLOTRANGE", "8192", "16383", "ADDR", "127.0.0.2:6379", "MASTER"
1006+
};
1007+
1008+
ArgvList argv(ctx, args);
1009+
uint32_t my_shard_idx = UINT32_MAX;
1010+
MRClusterTopology *topo = RedisEnterprise_ParseTopology(ctx, argv, argv.size(), &my_shard_idx);
1011+
1012+
ASSERT_NE(topo, nullptr) << "Parsing should succeed with empty local shard";
1013+
EXPECT_EQ(my_shard_idx, UINT32_MAX) << "Should not find local shard in topology (empty shard ignored)";
1014+
EXPECT_EQ(topo->numShards, 2) << "Should have 2 shards (empty shard ignored)";
1015+
1016+
// Verify that the local shard is not part of the topology
1017+
for (uint32_t i = 0; i < topo->numShards; i++) {
1018+
EXPECT_STRNE(topo->shards[i].node.id, "local_shard")
1019+
<< "Local shard should not be in topology";
1020+
}
1021+
1022+
MRClusterTopology_Free(topo);
1023+
}
1024+
1025+
TEST_F(ClusterSetTest, EdgeCase_LocalShardReplica) {
1026+
std::vector<std::string> args = {
1027+
"search.CLUSTERSET",
1028+
"MYID", "local_shard",
1029+
"RANGES", "2",
1030+
"SHARD", "local_shard", "ADDR", "127.0.0.3:6379", "SLOTRANGE", "0", "16383",
1031+
"SHARD", "other_shard", "ADDR", "127.0.0.1:6381", "SLOTRANGE", "0", "16383", "MASTER",
1032+
};
1033+
1034+
ArgvList argv(ctx, args);
1035+
uint32_t my_shard_idx = UINT32_MAX;
1036+
MRClusterTopology *topo = RedisEnterprise_ParseTopology(ctx, argv, argv.size(), &my_shard_idx);
1037+
1038+
ASSERT_NE(topo, nullptr) << "Parsing should succeed with empty local shard";
1039+
EXPECT_EQ(my_shard_idx, UINT32_MAX) << "Should not find local shard in topology (empty shard ignored)";
1040+
EXPECT_EQ(topo->numShards, 1) << "Should have 1 shard (empty shard ignored)";
1041+
1042+
// Verify that the local shard is not part of the topology
1043+
for (uint32_t i = 0; i < topo->numShards; i++) {
1044+
EXPECT_STRNE(topo->shards[i].node.id, "local_shard")
1045+
<< "Local shard should not be in topology";
1046+
}
1047+
1048+
MRClusterTopology_Free(topo);
1049+
}

tests/pytests/test.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4313,6 +4313,58 @@ def test_cluster_set_multiple_slots(env: Env):
43134313
]
43144314
env.expect('SEARCH.CLUSTERINFO').equal(expected)
43154315

4316+
@skip(cluster=False) # this test is only relevant on cluster
4317+
def test_cluster_set_myself_excluded(env: Env):
4318+
env.cmd(debug_cmd(), 'PAUSE_TOPOLOGY_UPDATER')
4319+
env.cmd(config_cmd(), 'SET', 'TOPOLOGY_VALIDATION_TIMEOUT', 1)
4320+
4321+
# Set two shards, one with all the slots, and one without any slots
4322+
env.expect(
4323+
'SEARCH.CLUSTERSET',
4324+
'HASHFUNC', 'CRC16',
4325+
'NUMSLOTS', '16384',
4326+
'MYID', '1',
4327+
'RANGES', '2',
4328+
'SHARD', '1',
4329+
'ADDR', 'localhost:7001',
4330+
'MASTER',
4331+
'SHARD', '2',
4332+
'ADDR', 'localhost:7002',
4333+
'MASTER',
4334+
'SLOTRANGE', '0', '16383'
4335+
).ok()
4336+
4337+
# Expect only the shard with slots to be listed
4338+
expected = [
4339+
'num_partitions', 1,
4340+
'cluster_type', 'redis_oss',
4341+
'shards', [['slots', [0, 16383], 'id', '2', 'host', 'localhost', 'port', 7002]],
4342+
]
4343+
env.expect('SEARCH.CLUSTERINFO').equal(expected)
4344+
4345+
# Set two shards, one master and myself as replica
4346+
env.expect(
4347+
'SEARCH.CLUSTERSET',
4348+
'HASHFUNC', 'CRC16',
4349+
'NUMSLOTS', '16384',
4350+
'MYID', '1',
4351+
'RANGES', '2',
4352+
'SHARD', '1',
4353+
'ADDR', 'localhost:7001',
4354+
'SLOTRANGE', '0', '16383',
4355+
'SHARD', '2',
4356+
'ADDR', 'localhost:7002',
4357+
'MASTER',
4358+
'SLOTRANGE', '0', '16383'
4359+
).ok()
4360+
4361+
# Expect only the master shard to be listed
4362+
expected = [
4363+
'num_partitions', 1,
4364+
'cluster_type', 'redis_oss',
4365+
'shards', [['slots', [0, 16383], 'id', '2', 'host', 'localhost', 'port', 7002]],
4366+
]
4367+
env.expect('SEARCH.CLUSTERINFO').equal(expected)
43164368

43174369
@skip(cluster=False) # this test is only relevant on cluster
43184370
def test_cluster_set_errors(env: Env):
@@ -4339,7 +4391,7 @@ def test_cluster_set_errors(env: Env):
43394391
env.expect('SEARCH.CLUSTERSET', 'MYID', '1', 'RANGES', '1',
43404392
'SHARD').error().contains('Missing value for SHARD')
43414393
env.expect('SEARCH.CLUSTERSET', 'MYID', '1', 'RANGES', '1',
4342-
'SHARD', '1').error().contains('MYID `1` does not correspond to any shard')
4394+
'SHARD', '1').error().contains('Missing value for ADDR at offset 7')
43434395
env.expect('SEARCH.CLUSTERSET', 'MYID', '1', 'RANGES', '1',
43444396
'SHARD', '1', 'SLOTRANGE').error().contains('Missing value for SLOTRANGE')
43454397
env.expect('SEARCH.CLUSTERSET', 'MYID', '1', 'RANGES', '1',

0 commit comments

Comments
 (0)