Skip to content

Commit a2ca26e

Browse files
committed
[Discovery] do not use versions to optimize cluster state copying for a first update from a new master
We have an optimization which compares routing/meta data version of cluster states and tries to reuse the current object if the versions are equal. This can cause rare failures during recovery from a minimum_master_node breach when using the "new light rejoin" mechanism and simulated network disconnects. This happens where the current master updates it's state, doesn't manage to broadcast it to other nodes due to the disconnect and then steps down. The new master will start with a previous version and continue to update it. When the old master rejoins, the versions of it's state can equal but the content is different. Also improved DiscoveryWithNetworkFailuresTests to simulate this failure (and other improvements) Closes #6466
1 parent 3ad9444 commit a2ca26e

6 files changed

Lines changed: 192 additions & 102 deletions

File tree

src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -367,20 +367,6 @@ public void run() {
367367
}
368368
}
369369
}
370-
} else {
371-
if (previousClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) && !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
372-
// force an update, its a fresh update from the master as we transition from a start of not having a master to having one
373-
// have a fresh instances of routing and metadata to remove the chance that version might be the same
374-
Builder builder = ClusterState.builder(newClusterState);
375-
builder.routingTable(RoutingTable.builder(newClusterState.routingTable()));
376-
builder.metaData(MetaData.builder(newClusterState.metaData()));
377-
newClusterState = builder.build();
378-
logger.debug("got first state from fresh master [{}]", newClusterState.nodes().masterNodeId());
379-
} else if (newClusterState.version() < previousClusterState.version()) {
380-
// we got a cluster state with older version, when we are *not* the master, let it in since it might be valid
381-
// we check on version where applicable, like at ZenDiscovery#handleNewClusterStateFromMaster
382-
logger.debug("got smaller cluster state when not master [" + newClusterState.version() + "<" + previousClusterState.version() + "] from source [" + source + "]");
383-
}
384370
}
385371

386372
newClusterState.status(ClusterState.ClusterStateStatus.BEING_APPLIED);

src/main/java/org/elasticsearch/discovery/DiscoverySettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public DiscoverySettings(Settings settings, NodeSettingsService nodeSettingsServ
5454
super(settings);
5555
nodeSettingsService.addListener(new ApplySettings());
5656
this.noMasterBlock = parseNoMasterBlock(settings.get(NO_MASTER_BLOCK, DEFAULT_NO_MASTER_BLOCK));
57+
this.publishTimeout = settings.getAsTime(PUBLISH_TIMEOUT, publishTimeout);
5758
}
5859

5960
/**

src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
5858

5959
private final TransportService transportService;
6060
private final ClusterService clusterService;
61+
private final DiscoveryService discoveryService;
6162
private final DiscoveryNodeService discoveryNodeService;
6263
private AllocationService allocationService;
6364
private final ClusterName clusterName;
@@ -77,14 +78,15 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
7778

7879
@Inject
7980
public LocalDiscovery(Settings settings, ClusterName clusterName, TransportService transportService, ClusterService clusterService,
80-
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings) {
81+
DiscoveryNodeService discoveryNodeService, Version version, DiscoverySettings discoverySettings, DiscoveryService discoveryService) {
8182
super(settings);
8283
this.clusterName = clusterName;
8384
this.clusterService = clusterService;
8485
this.transportService = transportService;
8586
this.discoveryNodeService = discoveryNodeService;
8687
this.version = version;
8788
this.discoverySettings = discoverySettings;
89+
this.discoveryService = discoveryService;
8890
}
8991

9092
@Override
@@ -305,13 +307,22 @@ private void publish(LocalDiscovery[] members, ClusterState clusterState, final
305307
nodeSpecificClusterState.status(ClusterState.ClusterStateStatus.RECEIVED);
306308
// ignore cluster state messages that do not include "me", not in the game yet...
307309
if (nodeSpecificClusterState.nodes().localNode() != null) {
310+
assert nodeSpecificClusterState.nodes().masterNode() != null : "received a cluster state without a master";
311+
assert !nodeSpecificClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";
312+
308313
discovery.clusterService.submitStateUpdateTask("local-disco-receive(from master)", new ProcessedClusterStateNonMasterUpdateTask() {
309314
@Override
310315
public ClusterState execute(ClusterState currentState) {
311316
if (nodeSpecificClusterState.version() < currentState.version() && Objects.equal(nodeSpecificClusterState.nodes().masterNodeId(), currentState.nodes().masterNodeId())) {
312317
return currentState;
313318
}
314319

320+
if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
321+
// its a fresh update from the master as we transition from a start of not having a master to having one
322+
logger.debug("got first state from fresh master [{}]", nodeSpecificClusterState.nodes().masterNodeId());
323+
return nodeSpecificClusterState;
324+
}
325+
315326
ClusterState.Builder builder = ClusterState.builder(nodeSpecificClusterState);
316327
// if the routing table did not change, use the original one
317328
if (nodeSpecificClusterState.routingTable().version() == currentState.routingTable().version()) {

src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
8383
private final ClusterService clusterService;
8484
private AllocationService allocationService;
8585
private final ClusterName clusterName;
86+
private final DiscoveryService discoveryService;
8687
private final DiscoveryNodeService discoveryNodeService;
8788
private final DiscoverySettings discoverySettings;
8889
private final ZenPingService pingService;
@@ -126,12 +127,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
126127
@Inject
127128
public ZenDiscovery(Settings settings, ClusterName clusterName, ThreadPool threadPool,
128129
TransportService transportService, ClusterService clusterService, NodeSettingsService nodeSettingsService,
129-
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings) {
130+
DiscoveryNodeService discoveryNodeService, ZenPingService pingService, Version version, DiscoverySettings discoverySettings,
131+
DiscoveryService discoveryService) {
130132
super(settings);
131133
this.clusterName = clusterName;
132134
this.threadPool = threadPool;
133135
this.clusterService = clusterService;
134136
this.transportService = transportService;
137+
this.discoveryService = discoveryService;
135138
this.discoveryNodeService = discoveryNodeService;
136139
this.discoverySettings = discoverySettings;
137140
this.pingService = pingService;
@@ -629,6 +632,10 @@ public void onFailure(String source, Throwable t) {
629632
final ProcessClusterState processClusterState = new ProcessClusterState(newClusterState, newStateProcessed);
630633
processNewClusterStates.add(processClusterState);
631634

635+
636+
assert newClusterState.nodes().masterNode() != null : "received a cluster state without a master";
637+
assert !newClusterState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock()) : "received a cluster state with a master block";
638+
632639
clusterService.submitStateUpdateTask("zen-disco-receive(from master [" + newClusterState.nodes().masterNode() + "])", Priority.URGENT, new ProcessedClusterStateNonMasterUpdateTask() {
633640
@Override
634641
public ClusterState execute(ClusterState currentState) {
@@ -689,7 +696,16 @@ public ClusterState execute(ClusterState currentState) {
689696
masterFD.restart(latestDiscoNodes.masterNode(), "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
690697
}
691698

699+
if (currentState.blocks().hasGlobalBlock(discoveryService.getNoMasterBlock())) {
700+
// its a fresh update from the master as we transition from a start of not having a master to having one
701+
logger.debug("got first state from fresh master [{}]", updatedState.nodes().masterNodeId());
702+
return updatedState;
703+
}
704+
705+
706+
// some optimizations to make sure we keep old objects where possible
692707
ClusterState.Builder builder = ClusterState.builder(updatedState);
708+
693709
// if the routing table did not change, use the original one
694710
if (updatedState.routingTable().version() == currentState.routingTable().version()) {
695711
builder.routingTable(currentState.routingTable());

0 commit comments

Comments
 (0)