|
44 | 44 | import org.elasticsearch.core.Releasable; |
45 | 45 | import org.elasticsearch.core.Releasables; |
46 | 46 | import org.elasticsearch.core.TimeValue; |
| 47 | +import org.elasticsearch.core.Tuple; |
47 | 48 | import org.elasticsearch.core.internal.io.IOUtils; |
48 | 49 | import org.elasticsearch.index.engine.Engine; |
49 | 50 | import org.elasticsearch.index.engine.RecoveryEngineException; |
@@ -560,77 +561,107 @@ void recoverFilesFromSourceAndSnapshot(ShardRecoveryPlan shardRecoveryPlan, |
560 | 561 | phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); |
561 | 562 | } |
562 | 563 |
|
| 564 | + // We need to pass the ShardRecovery plan between steps instead of capturing it in the closures |
| 565 | + // since the plan can change after a failure recovering files from the snapshots that cannot be |
| 566 | + // recovered from the source node, in that case we have to start from scratch using the fallback |
| 567 | + // recovery plan that would be used in subsequent steps. |
563 | 568 | final StepListener<Void> sendFileInfoStep = new StepListener<>(); |
564 | | - final StepListener<List<StoreFileMetadata>> recoverSnapshotFilesStep = new StepListener<>(); |
565 | | - final StepListener<Void> sendFilesStep = new StepListener<>(); |
566 | | - final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>(); |
567 | | - final StepListener<Void> cleanFilesStep = new StepListener<>(); |
| 569 | + final StepListener<Tuple<ShardRecoveryPlan, List<StoreFileMetadata>>> recoverSnapshotFilesStep = new StepListener<>(); |
| 570 | + final StepListener<ShardRecoveryPlan> sendFilesStep = new StepListener<>(); |
| 571 | + final StepListener<Tuple<ShardRecoveryPlan, RetentionLease>> createRetentionLeaseStep = new StepListener<>(); |
| 572 | + final StepListener<ShardRecoveryPlan> cleanFilesStep = new StepListener<>(); |
568 | 573 |
|
569 | 574 | final int translogOps = shardRecoveryPlan.getTranslogOps(); |
570 | 575 | recoveryTarget.receiveFileInfo(filesToRecoverNames, |
571 | 576 | filesToRecoverSizes, |
572 | 577 | phase1ExistingFileNames, |
573 | 578 | phase1ExistingFileSizes, |
574 | 579 | translogOps, |
| 580 | + false, |
575 | 581 | sendFileInfoStep |
576 | 582 | ); |
577 | 583 |
|
578 | | - sendFileInfoStep.whenComplete(r -> { |
579 | | - recoverSnapshotFiles(shardRecoveryPlan, recoverSnapshotFilesStep.delegateResponse((delegate, e) -> { |
580 | | - if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false && |
581 | | - e instanceof CancellableThreads.ExecutionCancelledException == false) { |
582 | | - recoveryTarget.deleteRecoveredFiles(new ActionListener<>() { |
583 | | - @Override |
584 | | - public void onResponse(Void unused) { |
585 | | - recoverFilesFromSourceAndSnapshot(shardRecoveryPlan.getFallbackPlan(), store, stopWatch, listener); |
586 | | - } |
| 584 | + sendFileInfoStep.whenComplete(unused -> { |
| 585 | + recoverSnapshotFiles(shardRecoveryPlan, new ActionListener<>() { |
| 586 | + @Override |
| 587 | + public void onResponse(List<StoreFileMetadata> filesFailedToRecoverFromSnapshot) { |
| 588 | + recoverSnapshotFilesStep.onResponse(Tuple.tuple(shardRecoveryPlan, filesFailedToRecoverFromSnapshot)); |
| 589 | + } |
587 | 590 |
|
588 | | - @Override |
589 | | - public void onFailure(Exception e) { |
590 | | - listener.onFailure(e); |
591 | | - } |
592 | | - }); |
593 | | - } else { |
594 | | - delegate.onFailure(e); |
| 591 | + @Override |
| 592 | + public void onFailure(Exception e) { |
| 593 | + if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false && |
| 594 | + e instanceof CancellableThreads.ExecutionCancelledException == false) { |
| 595 | + ShardRecoveryPlan fallbackPlan = shardRecoveryPlan.getFallbackPlan(); |
| 596 | + recoveryTarget.receiveFileInfo(fallbackPlan.getFilesToRecoverNames(), |
| 597 | + fallbackPlan.getFilesToRecoverSizes(), |
| 598 | + fallbackPlan.getFilesPresentInTargetNames(), |
| 599 | + fallbackPlan.getFilesPresentInTargetSizes(), |
| 600 | + fallbackPlan.getTranslogOps(), |
| 601 | + true, |
| 602 | + recoverSnapshotFilesStep.map(r -> Tuple.tuple(fallbackPlan, Collections.emptyList())) |
| 603 | + ); |
| 604 | + } else { |
| 605 | + recoverSnapshotFilesStep.onFailure(e); |
| 606 | + } |
595 | 607 | } |
596 | | - })); |
| 608 | + }); |
597 | 609 | }, listener::onFailure); |
598 | 610 |
|
599 | | - recoverSnapshotFilesStep.whenComplete(filesFailedToRecoverFromSnapshot -> { |
| 611 | + recoverSnapshotFilesStep.whenComplete(planAndFilesFailedToRecoverFromSnapshot -> { |
| 612 | + ShardRecoveryPlan recoveryPlan = planAndFilesFailedToRecoverFromSnapshot.v1(); |
| 613 | + List<StoreFileMetadata> filesFailedToRecoverFromSnapshot = planAndFilesFailedToRecoverFromSnapshot.v2(); |
600 | 614 | final List<StoreFileMetadata> filesToRecoverFromSource; |
601 | 615 | if (filesFailedToRecoverFromSnapshot.isEmpty()) { |
602 | | - filesToRecoverFromSource = shardRecoveryPlan.getSourceFilesToRecover(); |
| 616 | + filesToRecoverFromSource = recoveryPlan.getSourceFilesToRecover(); |
603 | 617 | } else { |
604 | | - filesToRecoverFromSource = concatLists(shardRecoveryPlan.getSourceFilesToRecover(), filesFailedToRecoverFromSnapshot); |
| 618 | + filesToRecoverFromSource = concatLists(recoveryPlan.getSourceFilesToRecover(), filesFailedToRecoverFromSnapshot); |
605 | 619 | } |
606 | 620 |
|
607 | 621 | sendFiles(store, |
608 | | - filesToRecoverFromSource.toArray(new StoreFileMetadata[0]), shardRecoveryPlan::getTranslogOps, sendFilesStep); |
| 622 | + filesToRecoverFromSource.toArray(new StoreFileMetadata[0]), |
| 623 | + recoveryPlan::getTranslogOps, |
| 624 | + sendFilesStep.map(unused -> recoveryPlan) |
| 625 | + ); |
609 | 626 | }, listener::onFailure); |
610 | 627 |
|
611 | | - final long startingSeqNo = shardRecoveryPlan.getStartingSeqNo(); |
612 | | - sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure); |
613 | | - |
614 | | - final Store.MetadataSnapshot recoverySourceMetadata = shardRecoveryPlan.getSourceMetadataSnapshot(); |
615 | | - createRetentionLeaseStep.whenComplete(retentionLease -> |
616 | | - { |
617 | | - final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint(); |
618 | | - assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint |
619 | | - : retentionLease + " vs " + lastKnownGlobalCheckpoint; |
620 | | - // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want |
621 | | - // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica |
622 | | - // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on |
623 | | - // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint. |
624 | | - cleanFiles(store, recoverySourceMetadata, () -> translogOps, lastKnownGlobalCheckpoint, cleanFilesStep); |
625 | | - }, |
626 | | - listener::onFailure); |
| 628 | + sendFilesStep.whenComplete(recoveryPlan -> { |
| 629 | + createRetentionLease(recoveryPlan.getStartingSeqNo(), |
| 630 | + createRetentionLeaseStep.map(retentionLease -> Tuple.tuple(recoveryPlan, retentionLease)) |
| 631 | + ); |
| 632 | + }, listener::onFailure); |
| 633 | + |
| 634 | + createRetentionLeaseStep.whenComplete(recoveryPlanAndRetentionLease -> { |
| 635 | + final ShardRecoveryPlan recoveryPlan = recoveryPlanAndRetentionLease.v1(); |
| 636 | + final RetentionLease retentionLease = recoveryPlanAndRetentionLease.v2(); |
| 637 | + final Store.MetadataSnapshot recoverySourceMetadata = recoveryPlan.getSourceMetadataSnapshot(); |
| 638 | + final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint(); |
| 639 | + assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint |
| 640 | + : retentionLease + " vs " + lastKnownGlobalCheckpoint; |
| 641 | + // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want |
| 642 | + // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica |
| 643 | + // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on |
| 644 | + // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint. |
| 645 | + cleanFiles(store, |
| 646 | + recoverySourceMetadata, |
| 647 | + () -> translogOps, |
| 648 | + lastKnownGlobalCheckpoint, |
| 649 | + cleanFilesStep.map(unused -> recoveryPlan) |
| 650 | + ); |
| 651 | + }, listener::onFailure); |
627 | 652 |
|
628 | | - cleanFilesStep.whenComplete(r -> { |
| 653 | + cleanFilesStep.whenComplete(recoveryPlan -> { |
629 | 654 | final TimeValue took = stopWatch.totalTime(); |
630 | 655 | logger.trace("recovery [phase1]: took [{}]", took); |
631 | 656 | listener.onResponse( |
632 | | - new SendFileResult(filesToRecoverNames, filesToRecoverSizes, totalSize, |
633 | | - phase1ExistingFileNames, phase1ExistingFileSizes, existingTotalSize, took) |
| 657 | + new SendFileResult(recoveryPlan.getFilesToRecoverNames(), |
| 658 | + recoveryPlan.getFilesToRecoverSizes(), |
| 659 | + recoveryPlan.getTotalSize(), |
| 660 | + recoveryPlan.getFilesPresentInTargetNames(), |
| 661 | + recoveryPlan.getFilesPresentInTargetSizes(), |
| 662 | + recoveryPlan.getExistingSize(), |
| 663 | + took |
| 664 | + ) |
634 | 665 | ); |
635 | 666 | }, listener::onFailure); |
636 | 667 | } |
|
0 commit comments