Skip to content

Commit 8f977ff

Browse files
committed
Allow NetworkDisruption to reconnect to known nodes
1 parent 9ea4166 commit 8f977ff

6 files changed

Lines changed: 18 additions & 14 deletions

File tree

server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,16 @@ public NodeConnectionsService(Settings settings, ThreadPool threadPool, Transpor
7878
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
7979
}
8080

81-
public void connectToNodes(DiscoveryNodes discoveryNodes) {
81+
public void connectToNodes(DiscoveryNodes discoveryNodes, boolean reconnectToKnownNodes) {
8282
CountDownLatch latch = new CountDownLatch(discoveryNodes.getSize());
8383
for (final DiscoveryNode node : discoveryNodes) {
8484
final boolean shouldConnect;
8585
try (Releasable ignored = nodeLocks.acquire(node)) {
8686
// We try and connect to any new nodes before returning. However, on the elected master the connections are established
8787
// during joining, so we also check that we're not already connected to avoid the need to execute any background tasks in
8888
// that case.
89-
shouldConnect = nodes.putIfAbsent(node, 0) == null && transportService.nodeConnected(node) == false;
89+
shouldConnect = (nodes.putIfAbsent(node, 0) == null || reconnectToKnownNodes)
90+
&& transportService.nodeConnected(node) == false;
9091
}
9192
if (shouldConnect) {
9293
// spawn to another thread to do in parallel

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -462,7 +462,7 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
462462
}
463463
}
464464

465-
nodeConnectionsService.connectToNodes(newClusterState.nodes());
465+
nodeConnectionsService.connectToNodes(newClusterState.nodes(), false);
466466

467467
logger.debug("applying cluster state version {}", newClusterState.version());
468468
try {

server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void testConnectAndDisconnect() {
8888
ClusterState current = clusterStateFromNodes(Collections.emptyList());
8989
ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
9090

91-
service.connectToNodes(event.state().nodes());
91+
service.connectToNodes(event.state().nodes(), false);
9292
assertConnected(event.state().nodes());
9393

9494
service.disconnectFromNodesExcept(event.state().nodes());
@@ -97,7 +97,7 @@ public void testConnectAndDisconnect() {
9797
current = event.state();
9898
event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
9999

100-
service.connectToNodes(event.state().nodes());
100+
service.connectToNodes(event.state().nodes(), false);
101101
assertConnected(event.state().nodes());
102102

103103
service.disconnectFromNodesExcept(event.state().nodes());
@@ -114,7 +114,7 @@ public void testReconnect() {
114114

115115
transport.randomConnectionExceptions = true;
116116

117-
service.connectToNodes(event.state().nodes());
117+
service.connectToNodes(event.state().nodes(), false);
118118

119119
for (int i = 0; i < 3; i++) {
120120
// simulate disconnects
@@ -137,21 +137,23 @@ public void testDoesNotReconnectToKnownNodesOnNewClusterState() {
137137
final ClusterState state1 = clusterStateFromNodes(randomSubsetOf(nodes));
138138
final ClusterState state2 = clusterStateFromNodes(randomSubsetOf(nodes));
139139

140-
service.connectToNodes(state1.nodes());
140+
service.connectToNodes(state1.nodes(), false);
141141

142142
final Set<DiscoveryNode> disconnectedNodes = new HashSet<>(randomSubsetOf(nodes));
143143
for (final DiscoveryNode node : disconnectedNodes) {
144144
transport.disconnectFromNode(node);
145145
}
146146

147-
service.connectToNodes(state2.nodes());
147+
boolean shouldReconnect = randomBoolean();
148+
149+
service.connectToNodes(state2.nodes(), shouldReconnect);
148150

149151
final Set<DiscoveryNode> expectedNodes = new HashSet<>(nodes.size());
150152
state1.nodes().forEach(expectedNodes::add);
151153
disconnectedNodes.forEach(expectedNodes::remove);
152154
for (final DiscoveryNode discoveryNode : state2.nodes()) {
153-
if (state1.nodes().get(discoveryNode.getId()) == null) {
154-
// Only expect to be connected to _new_ nodes in state2.
155+
if (shouldReconnect || state1.nodes().get(discoveryNode.getId()) == null) {
156+
// Only expect to be connected to _new_ nodes in state2 unless shouldReconnect is set
155157
expectedNodes.add(discoveryNode);
156158
}
157159
}
@@ -166,7 +168,7 @@ public void testDoesNotReconnectToKnownNodesOnNewClusterState() {
166168
assertConnected(expectedNodes);
167169
assertThat(transport.connectedNodes.size(), equalTo(expectedNodes.size()));
168170
}
169-
171+
170172
private void assertConnectedExactlyToNodes(ClusterState state) {
171173
assertConnected(state.nodes());
172174
assertThat(transport.connectedNodes.size(), equalTo(state.nodes().getSize()));

server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,9 @@ TimedClusterApplierService createTimedClusterService(boolean makeMaster) throws
9191
"ClusterApplierServiceTests").build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
9292
threadPool);
9393
timedClusterApplierService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
94+
9495
@Override
95-
public void connectToNodes(DiscoveryNodes discoveryNodes) {
96+
public void connectToNodes(DiscoveryNodes discoveryNodes, boolean reconnectToKnownNodes) {
9697
// skip
9798
}
9899

test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public static ClusterService createClusterService(ThreadPool threadPool, Discove
135135
clusterSettings, threadPool, Collections.emptyMap());
136136
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
137137
@Override
138-
public void connectToNodes(DiscoveryNodes discoveryNodes) {
138+
public void connectToNodes(DiscoveryNodes discoveryNodes, boolean reconnectToKnownNodes) {
139139
// skip
140140
}
141141

test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void ensureHealthy(InternalTestCluster cluster) {
104104
public static void ensureFullyConnectedCluster(InternalTestCluster cluster) {
105105
for (String node: cluster.getNodeNames()) {
106106
ClusterState stateOnNode = cluster.getInstance(ClusterService.class, node).state();
107-
cluster.getInstance(NodeConnectionsService.class, node).connectToNodes(stateOnNode.nodes());
107+
cluster.getInstance(NodeConnectionsService.class, node).connectToNodes(stateOnNode.nodes(), true);
108108
}
109109
}
110110

0 commit comments

Comments
 (0)