Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
57c5b83
working prototype
ywangd Aug 2, 2023
6c43819
fix tests for remote connections
ywangd Aug 2, 2023
6b5b50a
fix security tests
ywangd Aug 2, 2023
1c70ac8
tweak
ywangd Aug 2, 2023
bfed01d
Merge branch 'main' into rcs2-remote-cluster-no-restart
elasticmachine Aug 3, 2023
d4f35bb
WIP poc rcs reloadable keys
n1v0lg Nov 22, 2023
8ffd630
Merge branch 'main' into rcs2-reload
n1v0lg Nov 29, 2023
93485f8
Hot reload remote cluster credentials
n1v0lg Nov 29, 2023
13da6f7
WIP integ test
n1v0lg Nov 29, 2023
31714e7
Merge branch 'main' into rcs2-reload
n1v0lg Nov 30, 2023
4c086c0
Tweaks
n1v0lg Nov 30, 2023
e46a98c
Update docs/changelog/102798.yaml
n1v0lg Nov 30, 2023
174c390
License headers
n1v0lg Nov 30, 2023
24fd9a8
Delete
n1v0lg Nov 30, 2023
f477953
Merge branch 'rcs2-reload' of github.com:n1v0lg/elasticsearch into rc…
n1v0lg Nov 30, 2023
324f84e
Store creds with connection
n1v0lg Nov 30, 2023
ac53bcb
Fix tests
n1v0lg Nov 30, 2023
7a2e956
Merge branch 'main' into rcs2-reload
n1v0lg Dec 1, 2023
a95b016
Gnarly test with refactoring
n1v0lg Dec 1, 2023
a32d375
Remove unused
n1v0lg Dec 1, 2023
99b6d4c
Clean up
n1v0lg Dec 1, 2023
a6f0912
Test update
n1v0lg Dec 1, 2023
e348ef6
TODO
n1v0lg Dec 1, 2023
b0a515d
Clear
n1v0lg Dec 1, 2023
f15d588
Clean up and randomized connection
n1v0lg Dec 3, 2023
d558e82
Expose transport service to security
n1v0lg Dec 3, 2023
0b2fdb6
Rm unused method
n1v0lg Dec 4, 2023
a799b3d
Merge branch 'main' into rcs2-reload
n1v0lg Dec 4, 2023
d9af19e
Inject remote cluster service
n1v0lg Dec 4, 2023
f4355ff
Test fixes
n1v0lg Dec 4, 2023
52e61a1
Nit
n1v0lg Dec 4, 2023
8a72ace
Clean
n1v0lg Dec 4, 2023
cc72a51
TODO in test
n1v0lg Dec 4, 2023
8c1aca4
Test credentials manager
n1v0lg Dec 4, 2023
62c0e08
Nit
n1v0lg Dec 4, 2023
203b98a
Simplify
n1v0lg Dec 4, 2023
a2e755f
Desc
n1v0lg Dec 5, 2023
d0e764e
Code tweak
n1v0lg Dec 5, 2023
dc07c91
Local action
n1v0lg Dec 6, 2023
679154b
Fix tests
n1v0lg Dec 6, 2023
85290e5
Rm todo
n1v0lg Dec 6, 2023
722813d
Merge branch 'main' into rcs2-reload
n1v0lg Dec 6, 2023
6c86a93
More clean up and comments
n1v0lg Dec 6, 2023
4c589dc
Tests
n1v0lg Dec 6, 2023
6dcd745
Merge branch 'main' into rcs2-reload
elasticmachine Dec 6, 2023
d8307a5
Merge branch 'main' into rcs2-reload
elasticmachine Dec 7, 2023
4cf9b4c
Merge branch 'main' into rcs2-reload
n1v0lg Dec 7, 2023
a039367
Tests and wrapping
n1v0lg Dec 7, 2023
1c1adae
Test failures
n1v0lg Dec 7, 2023
207ca4b
Grammar nit
n1v0lg Dec 7, 2023
e513798
Test
n1v0lg Dec 7, 2023
5f91ae7
Merge branch 'main' into rcs2-reload
elasticmachine Dec 7, 2023
c865955
Merge branch 'main' into rcs2-reload
n1v0lg Dec 7, 2023
cd95799
Address review feedback
n1v0lg Dec 8, 2023
6fb39b0
Merge branch 'main' into rcs2-reload
elasticmachine Dec 8, 2023
31fed5f
Nit
n1v0lg Dec 8, 2023
197b2cf
Grammar still
n1v0lg Dec 8, 2023
d03e1d8
One more
n1v0lg Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/102798.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102798
summary: Hot-reloadable remote cluster credentials
area: Security
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
newConnection,
clusterAlias,
actualProfile.getTransportProfile()
connectionManager.getCredentialsManager()
),
actualProfile.getHandshakeTimeout(),
cn -> true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,28 @@ final class RemoteClusterConnection implements Closeable {
* @param settings the nodes settings object
* @param clusterAlias the configured alias of the cluster to connect to
* @param transportService the local nodes transport service
* @param credentialsProtected Whether the remote cluster is protected by a credentials, i.e. it has a credentials configured
* via secure setting. This means the remote cluster uses the new configurable access RCS model
* (as opposed to the basic model).
* @param credentialsManager object to lookup remote cluster credentials by cluster alias. If a cluster is protected by a credential,
* i.e. it has a credential configured via secure setting.
* This means the remote cluster uses the advances RCS model (as opposed to the basic model).
*/
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService, boolean credentialsProtected) {
RemoteClusterConnection(
Settings settings,
String clusterAlias,
TransportService transportService,
RemoteClusterCredentialsManager credentialsManager
) {
this.transportService = transportService;
this.clusterAlias = clusterAlias;
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings, credentialsProtected);
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService));
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(
clusterAlias,
settings,
credentialsManager.hasCredentials(clusterAlias)
);
this.remoteConnectionManager = new RemoteConnectionManager(
clusterAlias,
credentialsManager,
createConnectionManager(profile, transportService)
);
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
this.remoteConnectionManager.addListener(transportService);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;

import java.util.Map;

import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS;

public class RemoteClusterCredentialsManager {

private static final Logger logger = LogManager.getLogger(RemoteClusterCredentialsManager.class);

private volatile Map<String, SecureString> clusterCredentials;

public RemoteClusterCredentialsManager(Settings settings) {
updateClusterCredentials(settings);
}

public void updateClusterCredentials(Settings settings) {
clusterCredentials = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings);
logger.debug(
() -> Strings.format(
"Updated remote cluster credentials for clusters: [%s]",
Strings.collectionToCommaDelimitedString(clusterCredentials.keySet())
)
);
}

@Nullable
public SecureString resolveCredentials(String clusterAlias) {
return clusterCredentials.get(clusterAlias);
}

public boolean hasCredentials(String clusterAlias) {
return clusterCredentials.containsKey(clusterAlias);
}

public static final RemoteClusterCredentialsManager EMPTY = new RemoteClusterCredentialsManager(Settings.EMPTY);
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,14 @@ public boolean isRemoteClusterServerEnabled() {

private final TransportService transportService;
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
private final Set<String> credentialsProtectedRemoteClusters;
private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;

RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
this.transportService = transportService;
this.credentialsProtectedRemoteClusters = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings).keySet();

this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings);
if (remoteClusterServerEnabled) {
registerRemoteClusterHandshakeRequestHandler(transportService);
}
Expand Down Expand Up @@ -305,6 +304,14 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski
}
}

public void updateRemoteClusterCredentials(Settings settings) {
remoteClusterCredentialsManager.updateClusterCredentials(settings);
}

public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() {
return remoteClusterCredentialsManager;
}

@Override
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -363,12 +370,7 @@ synchronized void updateRemoteCluster(
if (remote == null) {
// this is a new cluster we have to add a new representation
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(
finalSettings,
clusterAlias,
transportService,
credentialsProtectedRemoteClusters.contains(clusterAlias)
);
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remoteClusters.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED));
} else if (remote.shouldRebuildConnection(newSettings)) {
Expand All @@ -380,12 +382,7 @@ synchronized void updateRemoteCluster(
}
remoteClusters.remove(clusterAlias);
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(
finalSettings,
clusterAlias,
transportService,
credentialsProtectedRemoteClusters.contains(clusterAlias)
);
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remoteClusters.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand All @@ -25,18 +26,19 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;

public class RemoteConnectionManager implements ConnectionManager {

private final String clusterAlias;
private final RemoteClusterCredentialsManager credentialsManager;
private final ConnectionManager delegate;
private final AtomicLong counter = new AtomicLong();
private volatile List<DiscoveryNode> connectedNodes = Collections.emptyList();

RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) {
RemoteConnectionManager(String clusterAlias, RemoteClusterCredentialsManager credentialsManager, ConnectionManager delegate) {
this.clusterAlias = clusterAlias;
this.credentialsManager = credentialsManager;
this.delegate = delegate;
this.delegate.addListener(new TransportConnectionListener() {
@Override
Expand All @@ -51,6 +53,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
});
}

public RemoteClusterCredentialsManager getCredentialsManager() {
return credentialsManager;
}

/**
* Remote cluster connections have a different lifecycle from intra-cluster connections. Use {@link #connectToRemoteClusterNode}
* instead of this method.
Expand Down Expand Up @@ -95,13 +101,7 @@ public void openConnection(DiscoveryNode node, @Nullable ConnectionProfile profi
node,
profile,
listener.delegateFailureAndWrap(
(l, connection) -> l.onResponse(
new InternalRemoteConnection(
connection,
clusterAlias,
profile != null ? profile.getTransportProfile() : getConnectionProfile().getTransportProfile()
)
)
(l, connection) -> l.onResponse(wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced the new InternalRemoteConnection(...) with wrapConnectionWithRemoteClusterInfo to lock down how an internal remote connection can be instantiated (this is a small refactor unrelated to the functional change in this PR).

The functional change here is passing credentialsManager to the wrap call instead of a fixed boolean flag. I got into more detail on this change in my comment on the wrapConnectionWithRemoteClusterInfo method below.

)
);
}
Expand Down Expand Up @@ -182,16 +182,35 @@ public void closeNoBlock() {
* @return a cluster alias if the connection target a node in the remote cluster, otherwise an empty result
*/
public static Optional<String> resolveRemoteClusterAlias(Transport.Connection connection) {
return resolveRemoteClusterAliasWithCredentials(connection).map(RemoteClusterAliasWithCredentials::clusterAlias);
}

public record RemoteClusterAliasWithCredentials(String clusterAlias, @Nullable SecureString credentials) {
@Override
public String toString() {
return "RemoteClusterAliasWithCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ and really surprised that we don't do this automatically with SecureString#toString() (we should fix that outside of this PR)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's trappy for sure, but that's currently how to translate between SecureString and a regular string (e.g., for sending cross cluster access headers, or when dealing with third party deps that don't have a concept of SecureString). Not saying, we shouldn't change it -- just that it's going to be a slightly trickier refactor.

}
}

/**
* This method returns information (alias and credentials) for remote cluster for the given transport connection.
* Either or both of alias and credentials can be null depending on the connection.
*
* @param connection the transport connection for which to resolve a remote cluster alias
*/
public static Optional<RemoteClusterAliasWithCredentials> resolveRemoteClusterAliasWithCredentials(Transport.Connection connection) {
Transport.Connection unwrapped = TransportService.unwrapConnection(connection);
if (unwrapped instanceof InternalRemoteConnection remoteConnection) {
return Optional.of(remoteConnection.getClusterAlias());
return Optional.of(
new RemoteClusterAliasWithCredentials(remoteConnection.getClusterAlias(), remoteConnection.getClusterCredentials())
);
}
return Optional.empty();
}

private Transport.Connection getConnectionInternal(DiscoveryNode node) throws NodeNotConnectedException {
Transport.Connection connection = delegate.getConnection(node);
return new InternalRemoteConnection(connection, clusterAlias, getConnectionProfile().getTransportProfile());
return wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager);
}

private synchronized void addConnectedNode(DiscoveryNode addedNode) {
Expand Down Expand Up @@ -297,21 +316,27 @@ private static final class InternalRemoteConnection implements Transport.Connect
private static final Logger logger = LogManager.getLogger(InternalRemoteConnection.class);
private final Transport.Connection connection;
private final String clusterAlias;
private final boolean isRemoteClusterProfile;
@Nullable
private final SecureString clusterCredentials;

InternalRemoteConnection(Transport.Connection connection, String clusterAlias, String transportProfile) {
private InternalRemoteConnection(Transport.Connection connection, String clusterAlias, @Nullable SecureString clusterCredentials) {
assert false == connection instanceof InternalRemoteConnection : "should not double wrap";
assert false == connection instanceof ProxyConnection
: "proxy connection should wrap internal remote connection, not the other way around";
this.clusterAlias = Objects.requireNonNull(clusterAlias);
this.connection = Objects.requireNonNull(connection);
this.isRemoteClusterProfile = REMOTE_CLUSTER_PROFILE.equals(Objects.requireNonNull(transportProfile));
this.clusterAlias = Objects.requireNonNull(clusterAlias);
this.clusterCredentials = clusterCredentials;
}

public String getClusterAlias() {
return clusterAlias;
}

@Nullable
public SecureString getClusterCredentials() {
return clusterCredentials;
}

@Override
public DiscoveryNode getNode() {
return connection.getNode();
Expand All @@ -321,7 +346,7 @@ public DiscoveryNode getNode() {
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
final String effectiveAction;
if (isRemoteClusterProfile && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
if (clusterCredentials != null && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
logger.trace("sending remote cluster specific handshake to node [{}] of remote cluster [{}]", getNode(), clusterAlias);
effectiveAction = REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
} else {
Expand Down Expand Up @@ -389,8 +414,8 @@ public boolean hasReferences() {
static InternalRemoteConnection wrapConnectionWithRemoteClusterInfo(
Comment thread
n1v0lg marked this conversation as resolved.
Transport.Connection connection,
String clusterAlias,
String transportProfile
RemoteClusterCredentialsManager credentialsManager
) {
return new InternalRemoteConnection(connection, clusterAlias, transportProfile);
return new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@ private ConnectionManager.ConnectionValidator getConnectionValidator(DiscoveryNo
: "transport profile must be consistent between the connection manager and the actual profile";
transportService.connectionValidator(node)
.validate(
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, profile.getTransportProfile()),
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
connection,
clusterAlias,
connectionManager.getCredentialsManager()
),
profile,
listener
);
Expand Down
Loading