Skip to content

Commit 17bfb34

Browse files
committed
feedback
1 parent 13bb82b commit 17bfb34

3 files changed

Lines changed: 284 additions & 113 deletions

File tree

server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java

Lines changed: 76 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.common.unit.ByteSizeValue;
4040
import org.elasticsearch.common.unit.TimeValue;
4141
import org.elasticsearch.index.seqno.ReplicationTracker;
42+
import org.elasticsearch.index.seqno.RetentionLease;
4243
import org.elasticsearch.index.store.StoreFileMetaData;
4344
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
4445
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;
@@ -47,15 +48,14 @@
4748
import java.util.HashMap;
4849
import java.util.List;
4950
import java.util.Map;
50-
import java.util.function.BooleanSupplier;
5151

5252
import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
5353

5454
public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
5555
/**
5656
* Process existing recoveries of replicas and see if we need to cancel them if we find a better
57-
* match. Today, a better match is one that has full sync id match or peer recovery retention lease
58-
* compared to not having one in the previous recovery.
57+
* match. Today, a better match is one that can perform a no-op recovery while the previous recovery
58+
* has to copy segment files.
5959
*/
6060
public void processExistingRecoveries(RoutingAllocation allocation) {
6161
MetaData metaData = allocation.metaData();
@@ -86,40 +86,30 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
8686

8787
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId());
8888
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
89-
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores);
89+
assert primaryShard.currentNodeId() != null;
90+
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
91+
final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore =
92+
primaryNode != null ? findStore(primaryNode, shardStores) : null;
9093
if (primaryStore == null) {
9194
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
9295
// just let the recovery find it out, no need to do anything about it for the initializing shard
9396
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
9497
continue;
9598
}
9699

97-
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores, false);
100+
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryNode, primaryStore, shardStores, false);
98101
if (matchingNodes.getNodeWithHighestMatch() != null) {
99102
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
100103
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
101-
BooleanSupplier currentNodeCanSkipPhase1 = () -> {
102-
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeMetadata =
103-
shardStores.getData().get(currentNode).storeFilesMetaData();
104-
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
105-
if (storeMetadata == null) {
106-
return false;
107-
}
108-
IndexMetaData indexMetadata = allocation.metaData().index(shard.index());
109-
if (canPerformOperationBasedRecovery(indexMetadata, primaryStore, currentNode, storeMetadata)) {
110-
return true;
111-
}
112-
final String currentSyncId = storeMetadata.syncId();
113-
return currentSyncId != null && currentSyncId.equals(primaryStore.syncId());
114-
};
115-
104+
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
116105
if (currentNode.equals(nodeWithHighestMatch) == false
117-
&& matchingNodes.canSkipPhase1(nodeWithHighestMatch) && currentNodeCanSkipPhase1.getAsBoolean() == false) {
118-
// we found a better match that can skip phase 1, cancel the existing allocation.
119-
logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]",
106+
&& matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch)
107+
&& canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == false) {
108+
// we found a better match that can perform noop recovery, cancel the existing allocation.
109+
logger.debug("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]",
120110
currentNode, nodeWithHighestMatch);
121111
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
122-
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+
112+
"existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+
123113
nodeWithHighestMatch + "]",
124114
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false,
125115
UnassignedInfo.AllocationStatus.NO_ATTEMPT);
@@ -186,8 +176,10 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
186176
return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()),
187177
new ArrayList<>(result.v2().values()));
188178
}
189-
190-
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores);
179+
assert primaryShard.currentNodeId() != null;
180+
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
181+
final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore =
182+
primaryNode != null ? findStore(primaryNode, shardStores) : null;
191183
if (primaryStore == null) {
192184
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
193185
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
@@ -197,7 +189,7 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
197189
return AllocateUnassignedDecision.NOT_TAKEN;
198190
}
199191

200-
MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryStore, shardStores, explain);
192+
MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryNode, primaryStore, shardStores, explain);
201193
assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions";
202194

203195
List<NodeAllocationResult> nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions);
@@ -299,22 +291,17 @@ private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(Map<S
299291
/**
300292
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
301293
*/
302-
private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation,
294+
private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(DiscoveryNode node,
303295
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
304-
assert shard.currentNodeId() != null;
305-
DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId());
306-
if (primaryNode == null) {
296+
NodeStoreFilesMetaData nodeFilesStore = data.getData().get(node);
297+
if (nodeFilesStore == null) {
307298
return null;
308299
}
309-
NodeStoreFilesMetaData primaryNodeFilesStore = data.getData().get(primaryNode);
310-
if (primaryNodeFilesStore == null) {
311-
return null;
312-
}
313-
return primaryNodeFilesStore.storeFilesMetaData();
300+
return nodeFilesStore.storeFilesMetaData();
314301
}
315302

316303
private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation,
317-
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
304+
DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
318305
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data,
319306
boolean explain) {
320307
Map<DiscoveryNode, MatchingNode> matchingNodes = new HashMap<>();
@@ -336,12 +323,9 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
336323
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
337324
// then we will try and assign it next time
338325
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
339-
final IndexMetaData indexMetaData = allocation.metaData().index(primaryStore.shardId().getIndex());
340-
341326
MatchingNode matchingNode = null;
342327
if (explain) {
343-
matchingNode = new MatchingNode(computeMatchingBytes(primaryStore, storeFilesMetaData),
344-
canPerformOperationBasedRecovery(indexMetaData, primaryStore, discoNode, storeFilesMetaData));
328+
matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetaData);
345329
ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingNode.matchingBytes);
346330
nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision));
347331
}
@@ -351,14 +335,13 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
351335
}
352336

353337
if (matchingNode == null) {
354-
matchingNode = new MatchingNode(computeMatchingBytes(primaryStore, storeFilesMetaData),
355-
canPerformOperationBasedRecovery(indexMetaData, primaryStore, discoNode, storeFilesMetaData));
338+
matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetaData);
356339
}
357340
matchingNodes.put(discoNode, matchingNode);
358341
if (logger.isTraceEnabled()) {
359342
if (matchingNode.matchingBytes == Long.MAX_VALUE) {
360343
logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.getName(), storeFilesMetaData.syncId());
361-
} else if (matchingNode.operationBasedRecovery){
344+
} else if (matchingNode.matchingOperations > 0){
362345
logger.trace("{}: node [{}] can perform operation-based recovery", shard, discoNode.getName());
363346
} else {
364347
logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data",
@@ -389,41 +372,68 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St
389372
}
390373
}
391374

392-
private static boolean canPerformOperationBasedRecovery(
393-
IndexMetaData indexMetaData, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
375+
private static long getRetainingSeqNoForNode(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, DiscoveryNode node) {
376+
final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId());
377+
return primaryStore.peerRecoveryRetentionLeases().stream()
378+
.filter(lease -> lease.id().equals(retentionLeaseId))
379+
.mapToLong(RetentionLease::retainingSequenceNumber).findFirst().orElse(0L);
380+
}
381+
382+
private static MatchingNode computeMatchingNode(
383+
DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
394384
DiscoveryNode replicaNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData replicaStore) {
395385
if (replicaStore.isEmpty()) {
396-
// a corrupted store - it won't be able to perform operation-based recovery
397-
return false;
386+
return new MatchingNode(0, 0, false); // store is corrupted
398387
}
399-
// If an index is closed or frozen, we can perform an operation-based recovery only if the last commit on the replica is safe and
400-
// has the same operations as the last commit on the primary. Here we must ignore the peer recovery retention lease as we don't
401-
// have the persisted global checkpoint from the replica to determine if the last commit is safe. However, the likelihood that
402-
// the last commit unsafe is very small. It's probably okay to use the retention lease if the sequence numbers of the last commit
403-
// from the primary and replica equal.
404-
if (indexMetaData.getState() == IndexMetaData.State.CLOSE ||
405-
IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.get(indexMetaData.getSettings())) {
388+
final long retainingSeqNoForPrimary = getRetainingSeqNoForNode(primaryStore, primaryNode);
389+
final long retainingSeqNoForReplica = getRetainingSeqNoForNode(primaryStore, replicaNode);
390+
final long matchingBytes = computeMatchingBytes(primaryStore, replicaStore);
391+
final boolean isNoopRecovery = matchingBytes == Long.MAX_VALUE
392+
|| (retainingSeqNoForReplica > 0 && retainingSeqNoForReplica == retainingSeqNoForPrimary);
393+
return new MatchingNode(matchingBytes, retainingSeqNoForReplica, isNoopRecovery);
394+
}
395+
396+
private static boolean canPerformOperationBasedRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
397+
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> shardStores,
398+
DiscoveryNode targetNode) {
399+
final NodeStoreFilesMetaData targetNodeStore = shardStores.getData().get(targetNode);
400+
if (targetNodeStore == null || targetNodeStore.storeFilesMetaData().isEmpty()) {
406401
return false;
407402
}
408-
final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaNode.getId());
409-
return primaryStore.peerRecoveryRetentionLeases().stream().anyMatch(lease -> lease.id().equals(retentionLeaseId));
403+
final String primarySyncId = primaryStore.syncId();
404+
if (primarySyncId != null && primarySyncId.equals(targetNodeStore.storeFilesMetaData().syncId())) {
405+
return true;
406+
}
407+
return getRetainingSeqNoForNode(primaryStore, targetNode) > 0;
410408
}
411409

412-
413410
protected abstract AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);
414411

415412
/**
416413
* Returns a boolean indicating whether fetching shard data has been triggered at any point for the given shard.
417414
*/
418415
protected abstract boolean hasInitiatedFetching(ShardRouting shard);
419416

420-
private static class MatchingNode {
417+
private static class MatchingNode implements Comparable<MatchingNode> {
421418
final long matchingBytes;
422-
final boolean operationBasedRecovery;
419+
final long matchingOperations;
420+
final boolean isNoopRecovery;
423421

424-
MatchingNode(long matchingBytes, boolean operationBasedRecovery) {
422+
MatchingNode(long matchingBytes, long matchingOperations, boolean isNoopRecovery) {
425423
this.matchingBytes = matchingBytes;
426-
this.operationBasedRecovery = operationBasedRecovery;
424+
this.matchingOperations = matchingOperations;
425+
this.isNoopRecovery = isNoopRecovery;
426+
}
427+
428+
@Override
429+
public int compareTo(MatchingNode that) {
430+
if (this.isNoopRecovery != that.isNoopRecovery) {
431+
return Boolean.compare(this.isNoopRecovery, that.isNoopRecovery);
432+
}
433+
if (this.matchingOperations != that.matchingOperations) {
434+
return Long.compare(this.matchingOperations, that.matchingOperations);
435+
}
436+
return Long.compare(this.matchingBytes, that.matchingBytes);
427437
}
428438
}
429439

@@ -437,13 +447,10 @@ static class MatchingNodes {
437447
this.matchingNodes = matchingNodes;
438448
this.nodeDecisions = nodeDecisions;
439449

440-
MatchingNode highestMatchValue = new MatchingNode(0, false);
450+
MatchingNode highestMatchValue = new MatchingNode(0, 0, false);
441451
DiscoveryNode highestMatchNode = null;
442-
443452
for (Map.Entry<DiscoveryNode, MatchingNode> entry : matchingNodes.entrySet()) {
444-
if (highestMatchValue.operationBasedRecovery == false && entry.getValue().operationBasedRecovery ||
445-
(highestMatchValue.operationBasedRecovery == entry.getValue().operationBasedRecovery &&
446-
highestMatchValue.matchingBytes < entry.getValue().matchingBytes)){
453+
if (highestMatchValue.compareTo(entry.getValue()) < 0) {
447454
highestMatchValue = entry.getValue();
448455
highestMatchNode = entry.getKey();
449456
}
@@ -460,9 +467,9 @@ public DiscoveryNode getNodeWithHighestMatch() {
460467
return this.nodeWithHighestMatch;
461468
}
462469

463-
boolean canSkipPhase1(DiscoveryNode node) {
470+
boolean canPerformNoopRecovery(DiscoveryNode node) {
464471
final MatchingNode matchingNode = matchingNodes.get(node);
465-
return matchingNode.matchingBytes == Long.MAX_VALUE || matchingNode.operationBasedRecovery;
472+
return matchingNode.isNoopRecovery;
466473
}
467474

468475
/**

0 commit comments

Comments
 (0)