Skip to content

Commit 2005eca

Browse files
support expected remote cluster name in CCS sniff mode
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
1 parent 2c27f8a commit 2005eca

5 files changed

Lines changed: 255 additions & 19 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55

66
## [Unreleased 3.x]
77
### Added
8+
- Support expected cluster name with validation in CCS Sniff mode ([#20532](https://github.com/opensearch-project/OpenSearch/pull/20532))
89

910
### Changed
1011

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@ public void apply(Settings value, Settings current, Settings previous) {
464464
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
465465
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
466466
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
467+
SniffConnectionStrategy.REMOTE_CLUSTER_EXPECTED_NAME,
467468
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
468469
ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING,
469470
ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING,

server/src/main/java/org/opensearch/transport/RemoteClusterAware.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
150150
SniffConnectionStrategy.REMOTE_CLUSTERS_PROXY,
151151
SniffConnectionStrategy.REMOTE_CLUSTER_SEEDS,
152152
SniffConnectionStrategy.REMOTE_NODE_CONNECTIONS,
153+
SniffConnectionStrategy.REMOTE_CLUSTER_EXPECTED_NAME,
153154
ProxyConnectionStrategy.PROXY_ADDRESS,
154155
ProxyConnectionStrategy.REMOTE_SOCKET_CONNECTIONS,
155156
ProxyConnectionStrategy.SERVER_NAME

server/src/main/java/org/opensearch/transport/SniffConnectionStrategy.java

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,23 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
139139
)
140140
);
141141

142+
/**
143+
* Optional expected cluster name for the remote cluster. If set, the connection will fail during handshake
144+
* if the remote cluster's name does not match this value. This prevents accidentally
145+
* connecting to the wrong cluster when seeds are misconfigured or stale. If the validation fails for a seed,
146+
* the connection attempt will continue to the next available seed. This is only supported in the sniff mode.
147+
*/
148+
public static final Setting.AffixSetting<String> REMOTE_CLUSTER_EXPECTED_NAME = Setting.affixKeySetting(
149+
"cluster.remote.",
150+
"expected_cluster_name",
151+
(ns, key) -> Setting.simpleString(
152+
key,
153+
new StrategyValidator<>(ns, key, ConnectionStrategy.SNIFF),
154+
Setting.Property.Dynamic,
155+
Setting.Property.NodeScope
156+
)
157+
);
158+
142159
static final int CHANNELS_PER_CONNECTION = 6;
143160

144161
private static final Predicate<DiscoveryNode> DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion())
@@ -150,6 +167,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
150167
private final Predicate<DiscoveryNode> nodePredicate;
151168
private final SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
152169
private final String proxyAddress;
170+
private final String expectedClusterName;
153171

154172
SniffConnectionStrategy(
155173
String clusterAlias,
@@ -165,7 +183,8 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
165183
settings,
166184
REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(settings),
167185
getNodePredicate(settings),
168-
REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings)
186+
REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(settings),
187+
REMOTE_CLUSTER_EXPECTED_NAME.getConcreteSettingForNamespace(clusterAlias).get(settings)
169188
);
170189
}
171190

@@ -177,7 +196,8 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
177196
Settings settings,
178197
int maxNumRemoteConnections,
179198
Predicate<DiscoveryNode> nodePredicate,
180-
List<String> configuredSeedNodes
199+
List<String> configuredSeedNodes,
200+
String expectedClusterName
181201
) {
182202
this(
183203
clusterAlias,
@@ -190,7 +210,8 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
190210
configuredSeedNodes,
191211
configuredSeedNodes.stream()
192212
.map(seedAddress -> (Supplier<DiscoveryNode>) () -> resolveSeedNode(clusterAlias, seedAddress, proxyAddress))
193-
.collect(Collectors.toList())
213+
.collect(Collectors.toList()),
214+
expectedClusterName
194215
);
195216
}
196217

@@ -203,14 +224,16 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
203224
int maxNumRemoteConnections,
204225
Predicate<DiscoveryNode> nodePredicate,
205226
List<String> configuredSeedNodes,
206-
List<Supplier<DiscoveryNode>> seedNodes
227+
List<Supplier<DiscoveryNode>> seedNodes,
228+
String expectedClusterName
207229
) {
208230
super(clusterAlias, transportService, connectionManager, settings);
209231
this.proxyAddress = proxyAddress;
210232
this.maxNumRemoteConnections = maxNumRemoteConnections;
211233
this.nodePredicate = nodePredicate;
212234
this.configuredSeedNodes = configuredSeedNodes;
213235
this.seedNodes = seedNodes;
236+
this.expectedClusterName = Strings.hasText(expectedClusterName) ? expectedClusterName : null;
214237
}
215238

216239
static Stream<Setting.AffixSetting<?>> enablementSettings() {
@@ -231,9 +254,11 @@ protected boolean strategyMustBeRebuilt(Settings newSettings) {
231254
String proxy = REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
232255
List<String> addresses = REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
233256
int nodeConnections = REMOTE_NODE_CONNECTIONS.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
257+
String newExpectedClusterName = REMOTE_CLUSTER_EXPECTED_NAME.getConcreteSettingForNamespace(clusterAlias).get(newSettings);
234258
return nodeConnections != maxNumRemoteConnections
235259
|| seedsChanged(configuredSeedNodes, addresses)
236-
|| proxyChanged(proxyAddress, proxy);
260+
|| proxyChanged(proxyAddress, proxy)
261+
|| expectedClusterNameChanged(expectedClusterName, newExpectedClusterName);
237262
}
238263

239264
@Override
@@ -248,7 +273,7 @@ protected void connectImpl(ActionListener<Void> listener) {
248273

249274
@Override
250275
protected RemoteConnectionInfo.ModeInfo getModeInfo() {
251-
return new SniffModeInfo(configuredSeedNodes, maxNumRemoteConnections, connectionManager.size());
276+
return new SniffModeInfo(configuredSeedNodes, maxNumRemoteConnections, connectionManager.size(), expectedClusterName);
252277
}
253278

254279
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> listener) {
@@ -478,11 +503,20 @@ private Predicate<ClusterName> getRemoteClusterNamePredicate() {
478503
return new Predicate<ClusterName>() {
479504
@Override
480505
public boolean test(ClusterName c) {
506+
// Check against the configured expected cluster name (if set)
507+
if (expectedClusterName != null && expectedClusterName.equals(c.value()) == false) {
508+
return false;
509+
}
510+
511+
// Check against the first cluster name seen if user has not provided an expected cluster name
481512
return remoteClusterName.get() == null || c.equals(remoteClusterName.get());
482513
}
483514

484515
@Override
485516
public String toString() {
517+
if (expectedClusterName != null) {
518+
return "expected remote cluster name [" + expectedClusterName + "]";
519+
}
486520
return remoteClusterName.get() == null
487521
? "any cluster name"
488522
: "expected remote cluster name [" + remoteClusterName.get().value() + "]";
@@ -562,6 +596,12 @@ private boolean proxyChanged(String oldProxy, String newProxy) {
562596
return Objects.equals(oldProxy, newProxy) == false;
563597
}
564598

599+
private boolean expectedClusterNameChanged(String oldExpectedName, String newExpectedName) {
600+
String oldClusterName = Strings.hasText(oldExpectedName) ? oldExpectedName : null;
601+
String newClusterName = Strings.hasText(newExpectedName) ? newExpectedName : null;
602+
return Objects.equals(oldClusterName, newClusterName) == false;
603+
}
604+
565605
/**
566606
* Information about the sniff mode
567607
*
@@ -572,17 +612,28 @@ public static class SniffModeInfo implements RemoteConnectionInfo.ModeInfo {
572612
final List<String> seedNodes;
573613
final int maxConnectionsPerCluster;
574614
final int numNodesConnected;
615+
final String expectedClusterName;
575616

576617
public SniffModeInfo(List<String> seedNodes, int maxConnectionsPerCluster, int numNodesConnected) {
618+
this(seedNodes, maxConnectionsPerCluster, numNodesConnected, null);
619+
}
620+
621+
public SniffModeInfo(List<String> seedNodes, int maxConnectionsPerCluster, int numNodesConnected, String expectedClusterName) {
577622
this.seedNodes = seedNodes;
578623
this.maxConnectionsPerCluster = maxConnectionsPerCluster;
579624
this.numNodesConnected = numNodesConnected;
625+
this.expectedClusterName = expectedClusterName;
580626
}
581627

582628
private SniffModeInfo(StreamInput input) throws IOException {
583629
seedNodes = Arrays.asList(input.readStringArray());
584630
maxConnectionsPerCluster = input.readVInt();
585631
numNodesConnected = input.readVInt();
632+
if (input.getVersion().onOrAfter(Version.V_3_5_0)) {
633+
expectedClusterName = input.readOptionalString();
634+
} else {
635+
expectedClusterName = null;
636+
}
586637
}
587638

588639
@Override
@@ -594,6 +645,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
594645
builder.endArray();
595646
builder.field("num_nodes_connected", numNodesConnected);
596647
builder.field("max_connections_per_cluster", maxConnectionsPerCluster);
648+
if (expectedClusterName != null) {
649+
builder.field("expected_cluster_name", expectedClusterName);
650+
}
597651
return builder;
598652
}
599653

@@ -602,6 +656,9 @@ public void writeTo(StreamOutput out) throws IOException {
602656
out.writeStringArray(seedNodes.toArray(new String[0]));
603657
out.writeVInt(maxConnectionsPerCluster);
604658
out.writeVInt(numNodesConnected);
659+
if (out.getVersion().onOrAfter(Version.V_3_5_0)) {
660+
out.writeOptionalString(expectedClusterName);
661+
}
605662
}
606663

607664
@Override
@@ -626,6 +683,10 @@ public int getNumNodesConnected() {
626683
return numNodesConnected;
627684
}
628685

686+
public String getExpectedClusterName() {
687+
return expectedClusterName;
688+
}
689+
629690
@Override
630691
public RemoteConnectionStrategy.ConnectionStrategy modeType() {
631692
return RemoteConnectionStrategy.ConnectionStrategy.SNIFF;
@@ -638,12 +699,13 @@ public boolean equals(Object o) {
638699
SniffModeInfo sniff = (SniffModeInfo) o;
639700
return maxConnectionsPerCluster == sniff.maxConnectionsPerCluster
640701
&& numNodesConnected == sniff.numNodesConnected
641-
&& Objects.equals(seedNodes, sniff.seedNodes);
702+
&& Objects.equals(seedNodes, sniff.seedNodes)
703+
&& Objects.equals(expectedClusterName, sniff.expectedClusterName);
642704
}
643705

644706
@Override
645707
public int hashCode() {
646-
return Objects.hash(seedNodes, maxConnectionsPerCluster, numNodesConnected);
708+
return Objects.hash(seedNodes, maxConnectionsPerCluster, numNodesConnected, expectedClusterName);
647709
}
648710
}
649711
}

0 commit comments

Comments
 (0)