Skip to content

Commit bcf8ef3

Browse files
author
Dharmesh 💤
committed
[Remote Store] Changes to introduce repository registration via node attributes
Signed-off-by: Dharmesh 💤 <sdharms@amazon.com>
1 parent 6a5b464 commit bcf8ef3

5 files changed

Lines changed: 693 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.cluster.remotestore.repository;
10+
11+
import org.opensearch.cluster.ClusterState;
12+
import org.opensearch.cluster.metadata.Metadata;
13+
import org.opensearch.cluster.metadata.RepositoriesMetadata;
14+
import org.opensearch.cluster.metadata.RepositoryMetadata;
15+
import org.opensearch.cluster.node.DiscoveryNode;
16+
import org.opensearch.common.settings.Settings;
17+
import org.opensearch.repositories.RepositoriesService;
18+
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
/**
25+
* RemoteStore Repository Registration helper
26+
*/
27+
public class RemoteStoreRepositoryRegistrationHelper {
28+
29+
public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "cluster.remote_store.segment";
30+
public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "cluster.remote_store.translog";
31+
public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type";
32+
public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.settings";
33+
34+
private static void validateAttributeNonNull(DiscoveryNode joiningNode, String attributeKey) {
35+
String attributeValue = joiningNode.getAttributes().get(attributeKey);
36+
if (attributeValue == null || attributeValue.isEmpty()) {
37+
throw new IllegalStateException("joining node [" + joiningNode + "] doesn't have the node attribute [" + attributeKey + "]");
38+
}
39+
}
40+
41+
/**
42+
* A node will be declared as remote store node if it has any of the remote store node attributes.
43+
* The method validates that the joining node has any of the remote store node attributes or not.
44+
* @param joiningNode
45+
* @return boolean value on the basis of remote store node attributes.
46+
*/
47+
public static boolean isRemoteStoreNode(DiscoveryNode joiningNode) {
48+
Map<String, String> joiningNodeAttributes = joiningNode.getAttributes();
49+
String segmentRepositoryName = joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
50+
String segmentRepositoryTypeAttributeKey = String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, segmentRepositoryName);
51+
String segmentRepositorySettingsAttributeKey = String.format(
52+
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT,
53+
segmentRepositoryName
54+
);
55+
String translogRepositoryName = joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
56+
String translogRepositoryTypeAttributeKey = String.format(
57+
REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
58+
translogRepositoryName
59+
);
60+
String translogRepositorySettingsAttributeKey = String.format(
61+
REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT,
62+
translogRepositoryName
63+
);
64+
65+
boolean remoteStoreNode = joiningNodeAttributes.get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
66+
|| joiningNodeAttributes.get(segmentRepositoryTypeAttributeKey) != null
67+
|| joiningNodeAttributes.get(segmentRepositorySettingsAttributeKey) != null
68+
|| joiningNodeAttributes.get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY) != null
69+
|| joiningNodeAttributes.get(translogRepositoryTypeAttributeKey) != null
70+
|| joiningNodeAttributes.get(translogRepositorySettingsAttributeKey) != null;
71+
72+
if (remoteStoreNode) {
73+
validateAttributeNonNull(joiningNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
74+
validateAttributeNonNull(joiningNode, segmentRepositoryTypeAttributeKey);
75+
validateAttributeNonNull(joiningNode, segmentRepositorySettingsAttributeKey);
76+
validateAttributeNonNull(joiningNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
77+
validateAttributeNonNull(joiningNode, translogRepositoryTypeAttributeKey);
78+
validateAttributeNonNull(joiningNode, translogRepositorySettingsAttributeKey);
79+
}
80+
81+
return remoteStoreNode;
82+
}
83+
84+
private static void compareAttribute(DiscoveryNode joiningNode, DiscoveryNode existingNode, String attributeKey) {
85+
String joiningNodeAttribute = joiningNode.getAttributes().get(attributeKey);
86+
String existingNodeAttribute = existingNode.getAttributes().get(attributeKey);
87+
88+
if (existingNodeAttribute.equals(joiningNodeAttribute) == false) {
89+
throw new IllegalStateException(
90+
"joining node ["
91+
+ joiningNode
92+
+ "] has node attribute ["
93+
+ attributeKey
94+
+ "] value ["
95+
+ joiningNodeAttribute
96+
+ "] which is different than existing node ["
97+
+ existingNode
98+
+ "] value ["
99+
+ existingNodeAttribute
100+
+ "]"
101+
);
102+
}
103+
}
104+
105+
// TODO: See a better way to compare the remote store node attributes.
106+
public static void compareNodeAttributes(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
107+
String segmentRepositoryName = existingNode.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
108+
String translogRepositoryName = existingNode.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
109+
110+
compareAttribute(joiningNode, existingNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
111+
compareAttribute(
112+
joiningNode,
113+
existingNode,
114+
String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, segmentRepositoryName)
115+
);
116+
compareAttribute(
117+
joiningNode,
118+
existingNode,
119+
String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, segmentRepositoryName)
120+
);
121+
compareAttribute(joiningNode, existingNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
122+
compareAttribute(
123+
joiningNode,
124+
existingNode,
125+
String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, translogRepositoryName)
126+
);
127+
compareAttribute(
128+
joiningNode,
129+
existingNode,
130+
String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, translogRepositoryName)
131+
);
132+
}
133+
134+
private static Settings buildSettings(String stringSettings) {
135+
Settings.Builder settings = Settings.builder();
136+
137+
String[] stringKeyValue = stringSettings.split(",");
138+
for (int i = 0; i < stringKeyValue.length; i++) {
139+
String[] keyValue = stringKeyValue[i].split(":");
140+
settings.put(keyValue[0].trim(), keyValue[1].trim());
141+
}
142+
143+
return settings.build();
144+
}
145+
146+
// TODO: Add logic to mark these repository as System Repository once thats merged.
147+
// Visible For testing
148+
public static RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String name) {
149+
String type = node.getAttributes().get(String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));
150+
String settings = node.getAttributes().get(String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, name));
151+
152+
validateAttributeNonNull(node, String.format(REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, name));
153+
validateAttributeNonNull(node, String.format(REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_FORMAT, name));
154+
155+
return new RepositoryMetadata(name, type, buildSettings(settings));
156+
157+
}
158+
159+
/**
160+
* Validated or adds the remote store repository to cluster state if it doesn't exist.
161+
* @param joiningNode
162+
* @param currentState
163+
* @return updated cluster state
164+
*/
165+
public static ClusterState validateOrAddRemoteStoreRepository(DiscoveryNode joiningNode, ClusterState currentState) {
166+
List<DiscoveryNode> existingNodes = new ArrayList<>(currentState.getNodes().getNodes().values());
167+
168+
ClusterState newState = ClusterState.builder(currentState).build();
169+
170+
// TODO: Mutating cluster state like this can be dangerous, this will need refactoring.
171+
if (existingNodes.size() == 0) {
172+
validateAttributeNonNull(joiningNode, REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY);
173+
validateAttributeNonNull(joiningNode, REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY);
174+
175+
newState = updateClusterStateWithRepositoryMetadata(
176+
currentState,
177+
buildRepositoryMetadata(joiningNode, joiningNode.getAttributes().get(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY))
178+
);
179+
newState = updateClusterStateWithRepositoryMetadata(
180+
newState,
181+
buildRepositoryMetadata(joiningNode, joiningNode.getAttributes().get(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY))
182+
);
183+
return newState;
184+
} else {
185+
compareNodeAttributes(joiningNode, existingNodes.get(0));
186+
}
187+
188+
return newState;
189+
}
190+
191+
private static ClusterState updateClusterStateWithRepositoryMetadata(
192+
ClusterState currentState,
193+
RepositoryMetadata newRepositoryMetadata
194+
) {
195+
RepositoriesService.validate(newRepositoryMetadata.name());
196+
197+
Metadata metadata = currentState.metadata();
198+
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
199+
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
200+
if (repositories == null) {
201+
repositories = new RepositoriesMetadata(Collections.singletonList(newRepositoryMetadata));
202+
} else {
203+
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);
204+
205+
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
206+
if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) {
207+
if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) {
208+
// Previous version is the same as this one no update is needed.
209+
return new ClusterState.Builder(currentState).build();
210+
} else {
211+
throw new IllegalStateException(
212+
"new repository metadata ["
213+
+ newRepositoryMetadata
214+
+ "] supplied by joining node is different from existing repository metadata ["
215+
+ repositoryMetadata
216+
+ "]"
217+
);
218+
}
219+
} else {
220+
repositoriesMetadata.add(repositoryMetadata);
221+
}
222+
}
223+
repositoriesMetadata.add(newRepositoryMetadata);
224+
repositories = new RepositoriesMetadata(repositoriesMetadata);
225+
}
226+
mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories);
227+
return ClusterState.builder(currentState).metadata(mdBuilder).build();
228+
}
229+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/** Restore remote store transport handler. */
10+
package org.opensearch.action.admin.cluster.remotestore.repository;

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,8 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
605605
// we are checking source node commission status here to reject any join request coming from a decommissioned node
606606
// even before executing the join task to fail fast
607607
JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata());
608+
609+
JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joinRequest.getSourceNode(), stateForJoinValidation);
608610
}
609611
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
610612
} else {

server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@
5959
import java.util.function.BiConsumer;
6060
import java.util.stream.Collectors;
6161

62+
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.compareNodeAttributes;
63+
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.isRemoteStoreNode;
64+
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.validateOrAddRemoteStoreRepository;
6265
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
6366
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
6467

@@ -140,6 +143,11 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
140143
ClusterState.Builder newState;
141144

142145
if (joiningNodes.size() == 1 && joiningNodes.get(0).isFinishElectionTask()) {
146+
DiscoveryNode joiningNode = joiningNodes.get(0).node();
147+
if (isRemoteStoreNode(joiningNode)) {
148+
// TODO: Mutating cluster state like this can be dangerous, this will need refactoring.
149+
currentState = validateOrAddRemoteStoreRepository(joiningNode, currentState);
150+
}
143151
return results.successes(joiningNodes).build(currentState);
144152
} else if (currentNodes.getClusterManagerNode() == null && joiningNodes.stream().anyMatch(Task::isBecomeClusterManagerTask)) {
145153
assert joiningNodes.stream().anyMatch(Task::isFinishElectionTask) : "becoming a cluster-manager but election is not finished "
@@ -160,6 +168,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
160168
}
161169

162170
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
171+
ClusterState intermediateState;
163172

164173
assert nodesBuilder.isLocalNodeElectedClusterManager();
165174

@@ -176,6 +185,12 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
176185
logger.debug("received a join request for an existing node [{}]", joinTask.node());
177186
} else {
178187
final DiscoveryNode node = joinTask.node();
188+
if (isRemoteStoreNode(node)) {
189+
// TODO: Mutating cluster state like this can be dangerous or anti pattern, this will need
190+
// refactoring.
191+
intermediateState = validateOrAddRemoteStoreRepository(node, newState.build());
192+
newState = ClusterState.builder(intermediateState);
193+
}
179194
try {
180195
if (enforceMajorVersion) {
181196
ensureMajorVersionBarrier(node.getVersion(), minClusterNodeVersion);
@@ -187,6 +202,8 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
187202
// we have added the same check in handleJoinRequest method and adding it here as this method
188203
// would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness
189204
ensureNodeCommissioned(node, currentState.metadata());
205+
206+
ensureRemoteStoreNodesCompatibility(node, currentState);
190207
nodesBuilder.add(node);
191208
nodesChanged = true;
192209
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
@@ -422,6 +439,45 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
422439
}
423440
}
424441

442+
/**
443+
* The method ensures two conditions -
444+
* 1. The joining node is remote store if it is joining a remote store cluster.
445+
* 2. The joining node is non-remote store if it is joining a non-remote store cluster.
446+
* A remote store node is the one which holds the all the remote store attributes and a remote store cluster is
447+
* the one which has only homogeneous remote store nodes with same node attributes
448+
*
449+
* @param joiningNode
450+
* @param currentState
451+
*/
452+
public static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, ClusterState currentState) {
453+
List<DiscoveryNode> existingNodes = new ArrayList<>(currentState.getNodes().getNodes().values());
454+
455+
/**
456+
* If there are no node in the cluster state we will No op the compatibility check as at this point we
457+
* cannot determine if this is a remote store cluster or non-remote store cluster.
458+
*/
459+
if (existingNodes.size() == 0) {
460+
return;
461+
}
462+
463+
/**
464+
* TODO: The below check is valid till we support migration, once we start supporting migration a remote
465+
* store node will be able to join a non remote store cluster and vice versa. #7986
466+
*/
467+
if (isRemoteStoreNode(joiningNode)) {
468+
if (isRemoteStoreNode(existingNodes.get(0))) {
469+
DiscoveryNode existingNode = existingNodes.get(0);
470+
compareNodeAttributes(joiningNode, existingNode);
471+
} else {
472+
throw new IllegalStateException("a remote store node [" + joiningNode + "] is trying to join a non remote store cluster");
473+
}
474+
} else {
475+
if (isRemoteStoreNode(existingNodes.get(0))) {
476+
throw new IllegalStateException("a non remote store node [" + joiningNode + "] is trying to join a remote store cluster");
477+
}
478+
}
479+
}
480+
425481
public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
426482
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
427483
) {
@@ -430,6 +486,7 @@ public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoin
430486
ensureNodesCompatibility(node.getVersion(), state.getNodes());
431487
ensureIndexCompatibility(node.getVersion(), state.getMetadata());
432488
ensureNodeCommissioned(node, state.getMetadata());
489+
ensureRemoteStoreNodesCompatibility(node, state);
433490
});
434491
validators.addAll(onJoinValidators);
435492
return Collections.unmodifiableCollection(validators);

0 commit comments

Comments
 (0)