3939import org .elasticsearch .common .unit .ByteSizeValue ;
4040import org .elasticsearch .common .unit .TimeValue ;
4141import org .elasticsearch .index .seqno .ReplicationTracker ;
42+ import org .elasticsearch .index .seqno .RetentionLease ;
4243import org .elasticsearch .index .store .StoreFileMetaData ;
4344import org .elasticsearch .indices .store .TransportNodesListShardStoreMetaData ;
4445import org .elasticsearch .indices .store .TransportNodesListShardStoreMetaData .NodeStoreFilesMetaData ;
4748import java .util .HashMap ;
4849import java .util .List ;
4950import java .util .Map ;
50- import java .util .function .BooleanSupplier ;
5151
5252import static org .elasticsearch .cluster .routing .UnassignedInfo .INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING ;
5353
5454public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
5555 /**
5656 * Process existing recoveries of replicas and see if we need to cancel them if we find a better
57- * match. Today, a better match is one that has full sync id match or peer recovery retention lease
58- * compared to not having one in the previous recovery .
57+ * match. Today, a better match is one that can perform a no-op recovery while the previous recovery
58+ * has to copy segment files .
5959 */
6060 public void processExistingRecoveries (RoutingAllocation allocation ) {
6161 MetaData metaData = allocation .metaData ();
@@ -86,40 +86,30 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
8686
8787 ShardRouting primaryShard = allocation .routingNodes ().activePrimary (shard .shardId ());
8888 assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary" ;
89- TransportNodesListShardStoreMetaData .StoreFilesMetaData primaryStore = findStore (primaryShard , allocation , shardStores );
89+ assert primaryShard .currentNodeId () != null ;
90+ final DiscoveryNode primaryNode = allocation .nodes ().get (primaryShard .currentNodeId ());
91+ final TransportNodesListShardStoreMetaData .StoreFilesMetaData primaryStore =
92+ primaryNode != null ? findStore (primaryNode , shardStores ) : null ;
9093 if (primaryStore == null ) {
9194 // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
9295 // just let the recovery find it out, no need to do anything about it for the initializing shard
9396 logger .trace ("{}: no primary shard store found or allocated, letting actual allocation figure it out" , shard );
9497 continue ;
9598 }
9699
97- MatchingNodes matchingNodes = findMatchingNodes (shard , allocation , primaryStore , shardStores , false );
100+ MatchingNodes matchingNodes = findMatchingNodes (shard , allocation , primaryNode , primaryStore , shardStores , false );
98101 if (matchingNodes .getNodeWithHighestMatch () != null ) {
99102 DiscoveryNode currentNode = allocation .nodes ().get (shard .currentNodeId ());
100103 DiscoveryNode nodeWithHighestMatch = matchingNodes .getNodeWithHighestMatch ();
101- BooleanSupplier currentNodeCanSkipPhase1 = () -> {
102- TransportNodesListShardStoreMetaData .StoreFilesMetaData storeMetadata =
103- shardStores .getData ().get (currentNode ).storeFilesMetaData ();
104- // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
105- if (storeMetadata == null ) {
106- return false ;
107- }
108- IndexMetaData indexMetadata = allocation .metaData ().index (shard .index ());
109- if (canPerformOperationBasedRecovery (indexMetadata , primaryStore , currentNode , storeMetadata )) {
110- return true ;
111- }
112- final String currentSyncId = storeMetadata .syncId ();
113- return currentSyncId != null && currentSyncId .equals (primaryStore .syncId ());
114- };
115-
104+ // current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
116105 if (currentNode .equals (nodeWithHighestMatch ) == false
117- && matchingNodes .canSkipPhase1 (nodeWithHighestMatch ) && currentNodeCanSkipPhase1 .getAsBoolean () == false ) {
118- // we found a better match that can skip phase 1, cancel the existing allocation.
119- logger .debug ("cancelling allocation of replica on [{}], sync id match found on node [{}]" ,
106+ && matchingNodes .canPerformNoopRecovery (nodeWithHighestMatch )
107+ && canPerformOperationBasedRecovery (primaryStore , shardStores , currentNode ) == false ) {
108+ // we found a better match that can perform noop recovery, cancel the existing allocation.
109+ logger .debug ("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]" ,
120110 currentNode , nodeWithHighestMatch );
121111 UnassignedInfo unassignedInfo = new UnassignedInfo (UnassignedInfo .Reason .REALLOCATED_REPLICA ,
122- "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" +
112+ "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on [" +
123113 nodeWithHighestMatch + "]" ,
124114 null , 0 , allocation .getCurrentNanoTime (), System .currentTimeMillis (), false ,
125115 UnassignedInfo .AllocationStatus .NO_ATTEMPT );
@@ -186,8 +176,10 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
186176 return AllocateUnassignedDecision .no (UnassignedInfo .AllocationStatus .fromDecision (allocateDecision .type ()),
187177 new ArrayList <>(result .v2 ().values ()));
188178 }
189-
190- TransportNodesListShardStoreMetaData .StoreFilesMetaData primaryStore = findStore (primaryShard , allocation , shardStores );
179+ assert primaryShard .currentNodeId () != null ;
180+ final DiscoveryNode primaryNode = allocation .nodes ().get (primaryShard .currentNodeId ());
181+ final TransportNodesListShardStoreMetaData .StoreFilesMetaData primaryStore =
182+ primaryNode != null ? findStore (primaryNode , shardStores ) : null ;
191183 if (primaryStore == null ) {
192184 // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
193185 // we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
@@ -197,7 +189,7 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
197189 return AllocateUnassignedDecision .NOT_TAKEN ;
198190 }
199191
200- MatchingNodes matchingNodes = findMatchingNodes (unassignedShard , allocation , primaryStore , shardStores , explain );
192+ MatchingNodes matchingNodes = findMatchingNodes (unassignedShard , allocation , primaryNode , primaryStore , shardStores , explain );
201193 assert explain == false || matchingNodes .nodeDecisions != null : "in explain mode, we must have individual node decisions" ;
202194
203195 List <NodeAllocationResult > nodeDecisions = augmentExplanationsWithStoreInfo (result .v2 (), matchingNodes .nodeDecisions );
@@ -299,22 +291,17 @@ private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(Map<S
299291 /**
300292 * Finds the store for the assigned shard in the fetched data, returns null if none is found.
301293 */
302- private static TransportNodesListShardStoreMetaData .StoreFilesMetaData findStore (ShardRouting shard , RoutingAllocation allocation ,
294+ private static TransportNodesListShardStoreMetaData .StoreFilesMetaData findStore (DiscoveryNode node ,
303295 AsyncShardFetch .FetchResult <NodeStoreFilesMetaData > data ) {
304- assert shard .currentNodeId () != null ;
305- DiscoveryNode primaryNode = allocation .nodes ().get (shard .currentNodeId ());
306- if (primaryNode == null ) {
296+ NodeStoreFilesMetaData nodeFilesStore = data .getData ().get (node );
297+ if (nodeFilesStore == null ) {
307298 return null ;
308299 }
309- NodeStoreFilesMetaData primaryNodeFilesStore = data .getData ().get (primaryNode );
310- if (primaryNodeFilesStore == null ) {
311- return null ;
312- }
313- return primaryNodeFilesStore .storeFilesMetaData ();
300+ return nodeFilesStore .storeFilesMetaData ();
314301 }
315302
316303 private MatchingNodes findMatchingNodes (ShardRouting shard , RoutingAllocation allocation ,
317- TransportNodesListShardStoreMetaData .StoreFilesMetaData primaryStore ,
304+ DiscoveryNode primaryNode , TransportNodesListShardStoreMetaData .StoreFilesMetaData primaryStore ,
318305 AsyncShardFetch .FetchResult <NodeStoreFilesMetaData > data ,
319306 boolean explain ) {
320307 Map <DiscoveryNode , MatchingNode > matchingNodes = new HashMap <>();
@@ -336,12 +323,9 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
336323 // we only check for NO, since if this node is THROTTLING and it has enough "same data"
337324 // then we will try and assign it next time
338325 Decision decision = allocation .deciders ().canAllocate (shard , node , allocation );
339- final IndexMetaData indexMetaData = allocation .metaData ().index (primaryStore .shardId ().getIndex ());
340-
341326 MatchingNode matchingNode = null ;
342327 if (explain ) {
343- matchingNode = new MatchingNode (computeMatchingBytes (primaryStore , storeFilesMetaData ),
344- canPerformOperationBasedRecovery (indexMetaData , primaryStore , discoNode , storeFilesMetaData ));
328+ matchingNode = computeMatchingNode (primaryNode , primaryStore , discoNode , storeFilesMetaData );
345329 ShardStoreInfo shardStoreInfo = new ShardStoreInfo (matchingNode .matchingBytes );
346330 nodeDecisions .put (node .nodeId (), new NodeAllocationResult (discoNode , shardStoreInfo , decision ));
347331 }
@@ -351,14 +335,13 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
351335 }
352336
353337 if (matchingNode == null ) {
354- matchingNode = new MatchingNode (computeMatchingBytes (primaryStore , storeFilesMetaData ),
355- canPerformOperationBasedRecovery (indexMetaData , primaryStore , discoNode , storeFilesMetaData ));
338+ matchingNode = computeMatchingNode (primaryNode , primaryStore , discoNode , storeFilesMetaData );
356339 }
357340 matchingNodes .put (discoNode , matchingNode );
358341 if (logger .isTraceEnabled ()) {
359342 if (matchingNode .matchingBytes == Long .MAX_VALUE ) {
360343 logger .trace ("{}: node [{}] has same sync id {} as primary" , shard , discoNode .getName (), storeFilesMetaData .syncId ());
361- } else if (matchingNode .operationBasedRecovery ){
344+ } else if (matchingNode .matchingOperations > 0 ){
362345 logger .trace ("{}: node [{}] can perform operation-based recovery" , shard , discoNode .getName ());
363346 } else {
364347 logger .trace ("{}: node [{}] has [{}/{}] bytes of re-usable data" ,
@@ -389,41 +372,68 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St
389372 }
390373 }
391374
392- private static boolean canPerformOperationBasedRecovery (
393- IndexMetaData indexMetaData , TransportNodesListShardStoreMetaData .StoreFilesMetaData primaryStore ,
375+ private static long getRetainingSeqNoForNode (TransportNodesListShardStoreMetaData .StoreFilesMetaData primaryStore , DiscoveryNode node ) {
376+ final String retentionLeaseId = ReplicationTracker .getPeerRecoveryRetentionLeaseId (node .getId ());
377+ return primaryStore .peerRecoveryRetentionLeases ().stream ()
378+ .filter (lease -> lease .id ().equals (retentionLeaseId ))
379+ .mapToLong (RetentionLease ::retainingSequenceNumber ).findFirst ().orElse (0L );
380+ }
381+
382+ private static MatchingNode computeMatchingNode (
383+ DiscoveryNode primaryNode , TransportNodesListShardStoreMetaData .StoreFilesMetaData primaryStore ,
394384 DiscoveryNode replicaNode , TransportNodesListShardStoreMetaData .StoreFilesMetaData replicaStore ) {
395385 if (replicaStore .isEmpty ()) {
396- // a corrupted store - it won't be able to perform operation-based recovery
397- return false ;
386+ return new MatchingNode (0 , 0 , false ); // store is corrupted
398387 }
399- // If an index is closed or frozen, we can perform an operation-based recovery only if the last commit on the replica is safe and
400- // has the same operations as the last commit on the primary. Here we must ignore the peer recovery retention lease as we don't
401- // have the persisted global checkpoint from the replica to determine if the last commit is safe. However, the likelihood that
402- // the last commit unsafe is very small. It's probably okay to use the retention lease if the sequence numbers of the last commit
403- // from the primary and replica equal.
404- if (indexMetaData .getState () == IndexMetaData .State .CLOSE ||
405- IndexMetaData .INDEX_BLOCKS_WRITE_SETTING .get (indexMetaData .getSettings ())) {
388+ final long retainingSeqNoForPrimary = getRetainingSeqNoForNode (primaryStore , primaryNode );
389+ final long retainingSeqNoForReplica = getRetainingSeqNoForNode (primaryStore , replicaNode );
390+ final long matchingBytes = computeMatchingBytes (primaryStore , replicaStore );
391+ final boolean isNoopRecovery = matchingBytes == Long .MAX_VALUE
392+ || (retainingSeqNoForReplica > 0 && retainingSeqNoForReplica == retainingSeqNoForPrimary );
393+ return new MatchingNode (matchingBytes , retainingSeqNoForReplica , isNoopRecovery );
394+ }
395+
396+ private static boolean canPerformOperationBasedRecovery (TransportNodesListShardStoreMetaData .StoreFilesMetaData primaryStore ,
397+ AsyncShardFetch .FetchResult <NodeStoreFilesMetaData > shardStores ,
398+ DiscoveryNode targetNode ) {
399+ final NodeStoreFilesMetaData targetNodeStore = shardStores .getData ().get (targetNode );
400+ if (targetNodeStore == null || targetNodeStore .storeFilesMetaData ().isEmpty ()) {
406401 return false ;
407402 }
408- final String retentionLeaseId = ReplicationTracker .getPeerRecoveryRetentionLeaseId (replicaNode .getId ());
409- return primaryStore .peerRecoveryRetentionLeases ().stream ().anyMatch (lease -> lease .id ().equals (retentionLeaseId ));
403+ final String primarySyncId = primaryStore .syncId ();
404+ if (primarySyncId != null && primarySyncId .equals (targetNodeStore .storeFilesMetaData ().syncId ())) {
405+ return true ;
406+ }
407+ return getRetainingSeqNoForNode (primaryStore , targetNode ) > 0 ;
410408 }
411409
412-
413410 protected abstract AsyncShardFetch .FetchResult <NodeStoreFilesMetaData > fetchData (ShardRouting shard , RoutingAllocation allocation );
414411
415412 /**
416413 * Returns a boolean indicating whether fetching shard data has been triggered at any point for the given shard.
417414 */
418415 protected abstract boolean hasInitiatedFetching (ShardRouting shard );
419416
420- private static class MatchingNode {
417+ private static class MatchingNode implements Comparable < MatchingNode > {
421418 final long matchingBytes ;
422- final boolean operationBasedRecovery ;
419+ final long matchingOperations ;
420+ final boolean isNoopRecovery ;
423421
424- MatchingNode (long matchingBytes , boolean operationBasedRecovery ) {
422+ MatchingNode (long matchingBytes , long matchingOperations , boolean isNoopRecovery ) {
425423 this .matchingBytes = matchingBytes ;
426- this .operationBasedRecovery = operationBasedRecovery ;
424+ this .matchingOperations = matchingOperations ;
425+ this .isNoopRecovery = isNoopRecovery ;
426+ }
427+
428+ @ Override
429+ public int compareTo (MatchingNode that ) {
430+ if (this .isNoopRecovery != that .isNoopRecovery ) {
431+ return Boolean .compare (this .isNoopRecovery , that .isNoopRecovery );
432+ }
433+ if (this .matchingOperations != that .matchingOperations ) {
434+ return Long .compare (this .matchingOperations , that .matchingOperations );
435+ }
436+ return Long .compare (this .matchingBytes , that .matchingBytes );
427437 }
428438 }
429439
@@ -437,13 +447,10 @@ static class MatchingNodes {
437447 this .matchingNodes = matchingNodes ;
438448 this .nodeDecisions = nodeDecisions ;
439449
440- MatchingNode highestMatchValue = new MatchingNode (0 , false );
450+ MatchingNode highestMatchValue = new MatchingNode (0 , 0 , false );
441451 DiscoveryNode highestMatchNode = null ;
442-
443452 for (Map .Entry <DiscoveryNode , MatchingNode > entry : matchingNodes .entrySet ()) {
444- if (highestMatchValue .operationBasedRecovery == false && entry .getValue ().operationBasedRecovery ||
445- (highestMatchValue .operationBasedRecovery == entry .getValue ().operationBasedRecovery &&
446- highestMatchValue .matchingBytes < entry .getValue ().matchingBytes )){
453+ if (highestMatchValue .compareTo (entry .getValue ()) < 0 ) {
447454 highestMatchValue = entry .getValue ();
448455 highestMatchNode = entry .getKey ();
449456 }
@@ -460,9 +467,9 @@ public DiscoveryNode getNodeWithHighestMatch() {
460467 return this .nodeWithHighestMatch ;
461468 }
462469
463- boolean canSkipPhase1 (DiscoveryNode node ) {
470+ boolean canPerformNoopRecovery (DiscoveryNode node ) {
464471 final MatchingNode matchingNode = matchingNodes .get (node );
465- return matchingNode .matchingBytes == Long . MAX_VALUE || matchingNode . operationBasedRecovery ;
472+ return matchingNode .isNoopRecovery ;
466473 }
467474
468475 /**
0 commit comments