-
Notifications
You must be signed in to change notification settings - Fork 25.9k
Hot-reloadable remote cluster credentials #102798
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
57c5b83
6c43819
6b5b50a
1c70ac8
bfed01d
d4f35bb
8ffd630
93485f8
13da6f7
31714e7
4c086c0
e46a98c
174c390
24fd9a8
f477953
324f84e
ac53bcb
7a2e956
a95b016
a32d375
99b6d4c
a6f0912
e348ef6
b0a515d
f15d588
d558e82
0b2fdb6
a799b3d
d9af19e
f4355ff
52e61a1
8a72ace
cc72a51
8c1aca4
62c0e08
203b98a
a2e755f
d0e764e
dc07c91
679154b
85290e5
722813d
6c86a93
4c589dc
6dcd745
d8307a5
4cf9b4c
a039367
1c1adae
207ca4b
e513798
5f91ae7
c865955
cd95799
6fb39b0
31fed5f
197b2cf
d03e1d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 102798 | ||
| summary: Hot-reloadable remote cluster credentials | ||
| area: Security | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
| * in compliance with, at your election, the Elastic License 2.0 or the Server | ||
| * Side Public License, v 1. | ||
| */ | ||
|
|
||
| package org.elasticsearch.transport; | ||
|
|
||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.settings.SecureString; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.core.Nullable; | ||
|
|
||
| import java.util.Map; | ||
|
|
||
| import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS; | ||
|
|
||
| public class RemoteClusterCredentialsManager { | ||
|
|
||
| private static final Logger logger = LogManager.getLogger(RemoteClusterCredentialsManager.class); | ||
|
|
||
| private volatile Map<String, SecureString> clusterCredentials; | ||
|
|
||
| public RemoteClusterCredentialsManager(Settings settings) { | ||
| updateClusterCredentials(settings); | ||
| } | ||
|
|
||
| public void updateClusterCredentials(Settings settings) { | ||
| clusterCredentials = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings); | ||
| logger.debug( | ||
| () -> Strings.format( | ||
| "Updated remote cluster credentials for clusters: [%s]", | ||
| Strings.collectionToCommaDelimitedString(clusterCredentials.keySet()) | ||
| ) | ||
| ); | ||
| } | ||
|
|
||
| @Nullable | ||
| public SecureString resolveCredentials(String clusterAlias) { | ||
| return clusterCredentials.get(clusterAlias); | ||
| } | ||
|
|
||
| public boolean hasCredentials(String clusterAlias) { | ||
| return clusterCredentials.containsKey(clusterAlias); | ||
| } | ||
|
|
||
| public static final RemoteClusterCredentialsManager EMPTY = new RemoteClusterCredentialsManager(Settings.EMPTY); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,6 +12,7 @@ | |
| import org.elasticsearch.TransportVersion; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.cluster.node.DiscoveryNode; | ||
| import org.elasticsearch.common.settings.SecureString; | ||
| import org.elasticsearch.common.util.CollectionUtils; | ||
| import org.elasticsearch.core.Nullable; | ||
| import org.elasticsearch.core.Releasable; | ||
|
|
@@ -25,18 +26,19 @@ | |
| import java.util.Set; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE; | ||
| import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME; | ||
|
|
||
| public class RemoteConnectionManager implements ConnectionManager { | ||
|
|
||
| private final String clusterAlias; | ||
| private final RemoteClusterCredentialsManager credentialsManager; | ||
| private final ConnectionManager delegate; | ||
| private final AtomicLong counter = new AtomicLong(); | ||
| private volatile List<DiscoveryNode> connectedNodes = Collections.emptyList(); | ||
|
|
||
| RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) { | ||
| RemoteConnectionManager(String clusterAlias, RemoteClusterCredentialsManager credentialsManager, ConnectionManager delegate) { | ||
| this.clusterAlias = clusterAlias; | ||
| this.credentialsManager = credentialsManager; | ||
| this.delegate = delegate; | ||
| this.delegate.addListener(new TransportConnectionListener() { | ||
| @Override | ||
|
|
@@ -51,6 +53,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti | |
| }); | ||
| } | ||
|
|
||
| public RemoteClusterCredentialsManager getCredentialsManager() { | ||
| return credentialsManager; | ||
| } | ||
|
|
||
| /** | ||
| * Remote cluster connections have a different lifecycle from intra-cluster connections. Use {@link #connectToRemoteClusterNode} | ||
| * instead of this method. | ||
|
|
@@ -95,13 +101,7 @@ public void openConnection(DiscoveryNode node, @Nullable ConnectionProfile profi | |
| node, | ||
| profile, | ||
| listener.delegateFailureAndWrap( | ||
| (l, connection) -> l.onResponse( | ||
| new InternalRemoteConnection( | ||
| connection, | ||
| clusterAlias, | ||
| profile != null ? profile.getTransportProfile() : getConnectionProfile().getTransportProfile() | ||
| ) | ||
| ) | ||
| (l, connection) -> l.onResponse(wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager)) | ||
| ) | ||
| ); | ||
| } | ||
|
|
@@ -182,16 +182,35 @@ public void closeNoBlock() { | |
| * @return a cluster alias if the connection target a node in the remote cluster, otherwise an empty result | ||
| */ | ||
| public static Optional<String> resolveRemoteClusterAlias(Transport.Connection connection) { | ||
| return resolveRemoteClusterAliasWithCredentials(connection).map(RemoteClusterAliasWithCredentials::clusterAlias); | ||
| } | ||
|
|
||
| public record RemoteClusterAliasWithCredentials(String clusterAlias, @Nullable SecureString credentials) { | ||
| @Override | ||
| public String toString() { | ||
| return "RemoteClusterAliasWithCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ++ and really surprised that we don't do this automatically with SecureString#toString() (we should fix that outside of this PR)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's trappy for sure, but that's currently how to translate between |
||
| } | ||
| } | ||
|
|
||
| /** | ||
| * This method returns information (alias and credentials) for remote cluster for the given transport connection. | ||
| * Either or both of alias and credentials can be null depending on the connection. | ||
| * | ||
| * @param connection the transport connection for which to resolve a remote cluster alias | ||
| */ | ||
| public static Optional<RemoteClusterAliasWithCredentials> resolveRemoteClusterAliasWithCredentials(Transport.Connection connection) { | ||
| Transport.Connection unwrapped = TransportService.unwrapConnection(connection); | ||
| if (unwrapped instanceof InternalRemoteConnection remoteConnection) { | ||
| return Optional.of(remoteConnection.getClusterAlias()); | ||
| return Optional.of( | ||
| new RemoteClusterAliasWithCredentials(remoteConnection.getClusterAlias(), remoteConnection.getClusterCredentials()) | ||
| ); | ||
| } | ||
| return Optional.empty(); | ||
| } | ||
|
|
||
| private Transport.Connection getConnectionInternal(DiscoveryNode node) throws NodeNotConnectedException { | ||
| Transport.Connection connection = delegate.getConnection(node); | ||
| return new InternalRemoteConnection(connection, clusterAlias, getConnectionProfile().getTransportProfile()); | ||
| return wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager); | ||
| } | ||
|
|
||
| private synchronized void addConnectedNode(DiscoveryNode addedNode) { | ||
|
|
@@ -297,21 +316,27 @@ private static final class InternalRemoteConnection implements Transport.Connect | |
| private static final Logger logger = LogManager.getLogger(InternalRemoteConnection.class); | ||
| private final Transport.Connection connection; | ||
| private final String clusterAlias; | ||
| private final boolean isRemoteClusterProfile; | ||
| @Nullable | ||
| private final SecureString clusterCredentials; | ||
|
|
||
| InternalRemoteConnection(Transport.Connection connection, String clusterAlias, String transportProfile) { | ||
| private InternalRemoteConnection(Transport.Connection connection, String clusterAlias, @Nullable SecureString clusterCredentials) { | ||
| assert false == connection instanceof InternalRemoteConnection : "should not double wrap"; | ||
| assert false == connection instanceof ProxyConnection | ||
| : "proxy connection should wrap internal remote connection, not the other way around"; | ||
| this.clusterAlias = Objects.requireNonNull(clusterAlias); | ||
| this.connection = Objects.requireNonNull(connection); | ||
| this.isRemoteClusterProfile = REMOTE_CLUSTER_PROFILE.equals(Objects.requireNonNull(transportProfile)); | ||
| this.clusterAlias = Objects.requireNonNull(clusterAlias); | ||
| this.clusterCredentials = clusterCredentials; | ||
| } | ||
|
|
||
| public String getClusterAlias() { | ||
| return clusterAlias; | ||
| } | ||
|
|
||
| @Nullable | ||
| public SecureString getClusterCredentials() { | ||
| return clusterCredentials; | ||
| } | ||
|
|
||
| @Override | ||
| public DiscoveryNode getNode() { | ||
| return connection.getNode(); | ||
|
|
@@ -321,7 +346,7 @@ public DiscoveryNode getNode() { | |
| public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) | ||
| throws IOException, TransportException { | ||
| final String effectiveAction; | ||
| if (isRemoteClusterProfile && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) { | ||
| if (clusterCredentials != null && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) { | ||
| logger.trace("sending remote cluster specific handshake to node [{}] of remote cluster [{}]", getNode(), clusterAlias); | ||
| effectiveAction = REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME; | ||
| } else { | ||
|
|
@@ -389,8 +414,8 @@ public boolean hasReferences() { | |
| static InternalRemoteConnection wrapConnectionWithRemoteClusterInfo( | ||
|
n1v0lg marked this conversation as resolved.
|
||
| Transport.Connection connection, | ||
| String clusterAlias, | ||
| String transportProfile | ||
| RemoteClusterCredentialsManager credentialsManager | ||
| ) { | ||
| return new InternalRemoteConnection(connection, clusterAlias, transportProfile); | ||
| return new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've replaced the
new InternalRemoteConnection(...)withwrapConnectionWithRemoteClusterInfoto lock down how an internal remote connection can be instantiated (this is a small refactor unrelated to the functional change in this PR).The functional change here is passing
credentialsManagerto the wrap call instead of a fixed boolean flag. I got into more detail on this change in my comment on thewrapConnectionWithRemoteClusterInfomethod below.