Skip to content

Commit 7ded906

Browse files
authored
Hot-reloadable remote cluster credentials (#102798)
This PR enables RCS 2.0 remote clusters to be configured without the need to restart nodes. It works as the follows (assuming both clusters are already running): 1. Get a cross-cluster API key for accessing the _remote_ cluster 2. Add cross-cluster API key to keystores of the _local_ cluster, e.g. ``` echo -n xxx | ./bin/elasticsearch-keystore add cluster.remote.my.credentials -x ``` 3. Call [ReloadSecureSettings API](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-nodes-reload-secure-settings.html) on the _local_ cluster 4. Configure RCS 2.0 remote cluster should now just work for the _local_ cluster, e.g. ``` PUT /_cluster/settings {"persistent":{"cluster":{"remote":{"my":{"seeds":["127.0.0.1:9443"]}}}}} ``` This PR does **not** include functionality to automatically re-build connections on secure settings reload. I will add this in a follow up PR. The high level technical approach is to maintain a credentials manager class and use this to attach credentials for connections to remote clusters. This [comment](https://github.com/elastic/elasticsearch/pull/102798/files#r1417708553) also provides more context on some lower level details. Relates: #98120 Relates: ES-6764
1 parent de70fcd commit 7ded906

25 files changed

Lines changed: 1054 additions & 260 deletions

File tree

docs/changelog/102798.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 102798
2+
summary: Hot-reloadable remote cluster credentials
3+
area: Security
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
179179
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
180180
newConnection,
181181
clusterAlias,
182-
actualProfile.getTransportProfile()
182+
connectionManager.getCredentialsManager()
183183
),
184184
actualProfile.getHandshakeTimeout(),
185185
cn -> true,

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,28 @@ final class RemoteClusterConnection implements Closeable {
5757
* @param settings the nodes settings object
5858
* @param clusterAlias the configured alias of the cluster to connect to
5959
* @param transportService the local nodes transport service
60-
* @param credentialsProtected Whether the remote cluster is protected by a credentials, i.e. it has a credentials configured
61-
* via secure setting. This means the remote cluster uses the new configurable access RCS model
62-
* (as opposed to the basic model).
60+
* @param credentialsManager object to lookup remote cluster credentials by cluster alias. If a cluster is protected by a credential,
61+
* i.e. it has a credential configured via secure setting.
62+
* This means the remote cluster uses the advances RCS model (as opposed to the basic model).
6363
*/
64-
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService, boolean credentialsProtected) {
64+
RemoteClusterConnection(
65+
Settings settings,
66+
String clusterAlias,
67+
TransportService transportService,
68+
RemoteClusterCredentialsManager credentialsManager
69+
) {
6570
this.transportService = transportService;
6671
this.clusterAlias = clusterAlias;
67-
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings, credentialsProtected);
68-
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService));
72+
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(
73+
clusterAlias,
74+
settings,
75+
credentialsManager.hasCredentials(clusterAlias)
76+
);
77+
this.remoteConnectionManager = new RemoteConnectionManager(
78+
clusterAlias,
79+
credentialsManager,
80+
createConnectionManager(profile, transportService)
81+
);
6982
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
7083
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
7184
this.remoteConnectionManager.addListener(transportService);
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.transport;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.common.settings.SecureString;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.core.Nullable;
17+
18+
import java.util.Map;
19+
20+
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS;
21+
22+
public class RemoteClusterCredentialsManager {
23+
24+
private static final Logger logger = LogManager.getLogger(RemoteClusterCredentialsManager.class);
25+
26+
private volatile Map<String, SecureString> clusterCredentials;
27+
28+
public RemoteClusterCredentialsManager(Settings settings) {
29+
updateClusterCredentials(settings);
30+
}
31+
32+
public void updateClusterCredentials(Settings settings) {
33+
clusterCredentials = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings);
34+
logger.debug(
35+
() -> Strings.format(
36+
"Updated remote cluster credentials for clusters: [%s]",
37+
Strings.collectionToCommaDelimitedString(clusterCredentials.keySet())
38+
)
39+
);
40+
}
41+
42+
@Nullable
43+
public SecureString resolveCredentials(String clusterAlias) {
44+
return clusterCredentials.get(clusterAlias);
45+
}
46+
47+
public boolean hasCredentials(String clusterAlias) {
48+
return clusterCredentials.containsKey(clusterAlias);
49+
}
50+
51+
public static final RemoteClusterCredentialsManager EMPTY = new RemoteClusterCredentialsManager(Settings.EMPTY);
52+
}

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,14 @@ public boolean isRemoteClusterServerEnabled() {
147147

148148
private final TransportService transportService;
149149
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
150-
private final Set<String> credentialsProtectedRemoteClusters;
150+
private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;
151151

152152
RemoteClusterService(Settings settings, TransportService transportService) {
153153
super(settings);
154154
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
155155
this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
156156
this.transportService = transportService;
157-
this.credentialsProtectedRemoteClusters = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings).keySet();
158-
157+
this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings);
159158
if (remoteClusterServerEnabled) {
160159
registerRemoteClusterHandshakeRequestHandler(transportService);
161160
}
@@ -305,6 +304,14 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski
305304
}
306305
}
307306

307+
public void updateRemoteClusterCredentials(Settings settings) {
308+
remoteClusterCredentialsManager.updateClusterCredentials(settings);
309+
}
310+
311+
public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() {
312+
return remoteClusterCredentialsManager;
313+
}
314+
308315
@Override
309316
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
310317
CountDownLatch latch = new CountDownLatch(1);
@@ -363,12 +370,7 @@ synchronized void updateRemoteCluster(
363370
if (remote == null) {
364371
// this is a new cluster we have to add a new representation
365372
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
366-
remote = new RemoteClusterConnection(
367-
finalSettings,
368-
clusterAlias,
369-
transportService,
370-
credentialsProtectedRemoteClusters.contains(clusterAlias)
371-
);
373+
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
372374
remoteClusters.put(clusterAlias, remote);
373375
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED));
374376
} else if (remote.shouldRebuildConnection(newSettings)) {
@@ -380,12 +382,7 @@ synchronized void updateRemoteCluster(
380382
}
381383
remoteClusters.remove(clusterAlias);
382384
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
383-
remote = new RemoteClusterConnection(
384-
finalSettings,
385-
clusterAlias,
386-
transportService,
387-
credentialsProtectedRemoteClusters.contains(clusterAlias)
388-
);
385+
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
389386
remoteClusters.put(clusterAlias, remote);
390387
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
391388
} else {

server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.TransportVersion;
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
15+
import org.elasticsearch.common.settings.SecureString;
1516
import org.elasticsearch.common.util.CollectionUtils;
1617
import org.elasticsearch.core.Nullable;
1718
import org.elasticsearch.core.Releasable;
@@ -25,18 +26,19 @@
2526
import java.util.Set;
2627
import java.util.concurrent.atomic.AtomicLong;
2728

28-
import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
2929
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
3030

3131
public class RemoteConnectionManager implements ConnectionManager {
3232

3333
private final String clusterAlias;
34+
private final RemoteClusterCredentialsManager credentialsManager;
3435
private final ConnectionManager delegate;
3536
private final AtomicLong counter = new AtomicLong();
3637
private volatile List<DiscoveryNode> connectedNodes = Collections.emptyList();
3738

38-
RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) {
39+
RemoteConnectionManager(String clusterAlias, RemoteClusterCredentialsManager credentialsManager, ConnectionManager delegate) {
3940
this.clusterAlias = clusterAlias;
41+
this.credentialsManager = credentialsManager;
4042
this.delegate = delegate;
4143
this.delegate.addListener(new TransportConnectionListener() {
4244
@Override
@@ -51,6 +53,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
5153
});
5254
}
5355

56+
public RemoteClusterCredentialsManager getCredentialsManager() {
57+
return credentialsManager;
58+
}
59+
5460
/**
5561
* Remote cluster connections have a different lifecycle from intra-cluster connections. Use {@link #connectToRemoteClusterNode}
5662
* instead of this method.
@@ -95,13 +101,7 @@ public void openConnection(DiscoveryNode node, @Nullable ConnectionProfile profi
95101
node,
96102
profile,
97103
listener.delegateFailureAndWrap(
98-
(l, connection) -> l.onResponse(
99-
new InternalRemoteConnection(
100-
connection,
101-
clusterAlias,
102-
profile != null ? profile.getTransportProfile() : getConnectionProfile().getTransportProfile()
103-
)
104-
)
104+
(l, connection) -> l.onResponse(wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager))
105105
)
106106
);
107107
}
@@ -182,16 +182,35 @@ public void closeNoBlock() {
182182
* @return a cluster alias if the connection target a node in the remote cluster, otherwise an empty result
183183
*/
184184
public static Optional<String> resolveRemoteClusterAlias(Transport.Connection connection) {
185+
return resolveRemoteClusterAliasWithCredentials(connection).map(RemoteClusterAliasWithCredentials::clusterAlias);
186+
}
187+
188+
public record RemoteClusterAliasWithCredentials(String clusterAlias, @Nullable SecureString credentials) {
189+
@Override
190+
public String toString() {
191+
return "RemoteClusterAliasWithCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}";
192+
}
193+
}
194+
195+
/**
196+
* This method returns information (alias and credentials) for remote cluster for the given transport connection.
197+
* Either or both of alias and credentials can be null depending on the connection.
198+
*
199+
* @param connection the transport connection for which to resolve a remote cluster alias
200+
*/
201+
public static Optional<RemoteClusterAliasWithCredentials> resolveRemoteClusterAliasWithCredentials(Transport.Connection connection) {
185202
Transport.Connection unwrapped = TransportService.unwrapConnection(connection);
186203
if (unwrapped instanceof InternalRemoteConnection remoteConnection) {
187-
return Optional.of(remoteConnection.getClusterAlias());
204+
return Optional.of(
205+
new RemoteClusterAliasWithCredentials(remoteConnection.getClusterAlias(), remoteConnection.getClusterCredentials())
206+
);
188207
}
189208
return Optional.empty();
190209
}
191210

192211
private Transport.Connection getConnectionInternal(DiscoveryNode node) throws NodeNotConnectedException {
193212
Transport.Connection connection = delegate.getConnection(node);
194-
return new InternalRemoteConnection(connection, clusterAlias, getConnectionProfile().getTransportProfile());
213+
return wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager);
195214
}
196215

197216
private synchronized void addConnectedNode(DiscoveryNode addedNode) {
@@ -297,21 +316,27 @@ private static final class InternalRemoteConnection implements Transport.Connect
297316
private static final Logger logger = LogManager.getLogger(InternalRemoteConnection.class);
298317
private final Transport.Connection connection;
299318
private final String clusterAlias;
300-
private final boolean isRemoteClusterProfile;
319+
@Nullable
320+
private final SecureString clusterCredentials;
301321

302-
InternalRemoteConnection(Transport.Connection connection, String clusterAlias, String transportProfile) {
322+
private InternalRemoteConnection(Transport.Connection connection, String clusterAlias, @Nullable SecureString clusterCredentials) {
303323
assert false == connection instanceof InternalRemoteConnection : "should not double wrap";
304324
assert false == connection instanceof ProxyConnection
305325
: "proxy connection should wrap internal remote connection, not the other way around";
306-
this.clusterAlias = Objects.requireNonNull(clusterAlias);
307326
this.connection = Objects.requireNonNull(connection);
308-
this.isRemoteClusterProfile = REMOTE_CLUSTER_PROFILE.equals(Objects.requireNonNull(transportProfile));
327+
this.clusterAlias = Objects.requireNonNull(clusterAlias);
328+
this.clusterCredentials = clusterCredentials;
309329
}
310330

311331
public String getClusterAlias() {
312332
return clusterAlias;
313333
}
314334

335+
@Nullable
336+
public SecureString getClusterCredentials() {
337+
return clusterCredentials;
338+
}
339+
315340
@Override
316341
public DiscoveryNode getNode() {
317342
return connection.getNode();
@@ -321,7 +346,7 @@ public DiscoveryNode getNode() {
321346
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
322347
throws IOException, TransportException {
323348
final String effectiveAction;
324-
if (isRemoteClusterProfile && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
349+
if (clusterCredentials != null && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
325350
logger.trace("sending remote cluster specific handshake to node [{}] of remote cluster [{}]", getNode(), clusterAlias);
326351
effectiveAction = REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
327352
} else {
@@ -389,8 +414,8 @@ public boolean hasReferences() {
389414
static InternalRemoteConnection wrapConnectionWithRemoteClusterInfo(
390415
Transport.Connection connection,
391416
String clusterAlias,
392-
String transportProfile
417+
RemoteClusterCredentialsManager credentialsManager
393418
) {
394-
return new InternalRemoteConnection(connection, clusterAlias, transportProfile);
419+
return new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias));
395420
}
396421
}

server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,11 @@ private ConnectionManager.ConnectionValidator getConnectionValidator(DiscoveryNo
357357
: "transport profile must be consistent between the connection manager and the actual profile";
358358
transportService.connectionValidator(node)
359359
.validate(
360-
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, profile.getTransportProfile()),
360+
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
361+
connection,
362+
clusterAlias,
363+
connectionManager.getCredentialsManager()
364+
),
361365
profile,
362366
listener
363367
);

0 commit comments

Comments
 (0)