Skip to content

Commit e48a0cb

Browse files
committed
Avoid recursive call
1 parent fdd32d4 commit e48a0cb

11 files changed

Lines changed: 182 additions & 317 deletions

File tree

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ public class PeerRecoveryTargetService implements IndexEventListener {
7878
public static class Actions {
7979
public static final String FILES_INFO = "internal:index/shard/recovery/filesInfo";
8080
public static final String RESTORE_FILE_FROM_SNAPSHOT = "internal:index/shard/recovery/restore_file_from_snapshot";
81-
public static final String DELETE_RECOVERED_FILES = "internal:index/shard/recovery/delete_recovered_files";
8281
public static final String FILE_CHUNK = "internal:index/shard/recovery/file_chunk";
8382
public static final String CLEAN_FILES = "internal:index/shard/recovery/clean_files";
8483
public static final String TRANSLOG_OPS = "internal:index/shard/recovery/translog_ops";
@@ -113,8 +112,6 @@ public PeerRecoveryTargetService(ThreadPool threadPool,
113112
new FilesInfoRequestHandler());
114113
transportService.registerRequestHandler(Actions.RESTORE_FILE_FROM_SNAPSHOT, ThreadPool.Names.GENERIC,
115114
RecoverySnapshotFileRequest::new, new RestoreFileFromSnapshotTransportRequestHandler());
116-
transportService.registerRequestHandler(Actions.DELETE_RECOVERED_FILES, ThreadPool.Names.GENERIC,
117-
RecoveryDeleteRecoveredFilesRequest::new, new DeleteRecoveredFilesTransportRequestHandler());
118115
transportService.registerRequestHandler(Actions.FILE_CHUNK, ThreadPool.Names.GENERIC, RecoveryFileChunkRequest::new,
119116
new FileChunkTransportRequestHandler());
120117
transportService.registerRequestHandler(Actions.CLEAN_FILES, ThreadPool.Names.GENERIC,
@@ -418,7 +415,7 @@ public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel c
418415

419416
recoveryRef.target().receiveFileInfo(
420417
request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames, request.phase1ExistingFileSizes,
421-
request.totalTranslogOps, listener);
418+
request.totalTranslogOps, request.deleteRecoveredFiles, listener);
422419
}
423420
}
424421
}
@@ -497,24 +494,6 @@ public void messageReceived(final RecoverySnapshotFileRequest request, Transport
497494
}
498495
}
499496

500-
class DeleteRecoveredFilesTransportRequestHandler implements TransportRequestHandler<RecoveryDeleteRecoveredFilesRequest> {
501-
@Override
502-
public void messageReceived(RecoveryDeleteRecoveredFilesRequest request,
503-
TransportChannel channel,
504-
Task task) throws Exception {
505-
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.getRecoveryId(), request.getShardId())) {
506-
final RecoveryTarget recoveryTarget = recoveryRef.target();
507-
final ActionListener<Void> listener =
508-
createOrFinishListener(recoveryRef, channel, Actions.DELETE_RECOVERED_FILES, request);
509-
if (listener == null) {
510-
return;
511-
}
512-
513-
recoveryTarget.deleteRecoveredFiles(listener);
514-
}
515-
}
516-
}
517-
518497

519498
private ActionListener<Void> createOrFinishListener(final RecoveryRef recoveryRef, final TransportChannel channel,
520499
final String action, final RecoveryTransportRequest request) {

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryDeleteRecoveredFilesRequest.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFilesInfoRequest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public class RecoveryFilesInfoRequest extends RecoveryTransportRequest {
2828

2929
int totalTranslogOps;
3030

31+
boolean deleteRecoveredFiles;
32+
3133
public RecoveryFilesInfoRequest(StreamInput in) throws IOException {
3234
super(in);
3335
recoveryId = in.readLong();
@@ -56,11 +58,17 @@ public RecoveryFilesInfoRequest(StreamInput in) throws IOException {
5658
phase1ExistingFileSizes.add(in.readVLong());
5759
}
5860
totalTranslogOps = in.readVInt();
61+
62+
if (in.getVersion().onOrAfter(RecoverySettings.SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION)) {
63+
deleteRecoveredFiles = in.readBoolean();
64+
} else {
65+
deleteRecoveredFiles = false;
66+
}
5967
}
6068

6169
RecoveryFilesInfoRequest(long recoveryId, long requestSeqNo, ShardId shardId, List<String> phase1FileNames,
6270
List<Long> phase1FileSizes, List<String> phase1ExistingFileNames, List<Long> phase1ExistingFileSizes,
63-
int totalTranslogOps) {
71+
int totalTranslogOps, boolean deleteRecoveredFiles) {
6472
super(requestSeqNo);
6573
this.recoveryId = recoveryId;
6674
this.shardId = shardId;
@@ -69,6 +77,7 @@ public RecoveryFilesInfoRequest(StreamInput in) throws IOException {
6977
this.phase1ExistingFileNames = phase1ExistingFileNames;
7078
this.phase1ExistingFileSizes = phase1ExistingFileSizes;
7179
this.totalTranslogOps = totalTranslogOps;
80+
this.deleteRecoveredFiles = deleteRecoveredFiles;
7281
}
7382

7483
public long recoveryId() {
@@ -105,5 +114,9 @@ public void writeTo(StreamOutput out) throws IOException {
105114
out.writeVLong(phase1ExistingFileSize);
106115
}
107116
out.writeVInt(totalTranslogOps);
117+
118+
if (out.getVersion().onOrAfter(RecoverySettings.SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION)) {
119+
out.writeBoolean(deleteRecoveredFiles);
120+
}
108121
}
109122
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
public class RecoverySettings {
3232
public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0;
33+
public static final Version SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_16_0;
3334

3435
private static final Logger logger = LogManager.getLogger(RecoverySettings.class);
3536

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 75 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.core.Releasable;
4545
import org.elasticsearch.core.Releasables;
4646
import org.elasticsearch.core.TimeValue;
47+
import org.elasticsearch.core.Tuple;
4748
import org.elasticsearch.core.internal.io.IOUtils;
4849
import org.elasticsearch.index.engine.Engine;
4950
import org.elasticsearch.index.engine.RecoveryEngineException;
@@ -560,77 +561,107 @@ void recoverFilesFromSourceAndSnapshot(ShardRecoveryPlan shardRecoveryPlan,
560561
phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
561562
}
562563

564+
// We need to pass the ShardRecovery plan between steps instead of capturing it in the closures
565+
// since the plan can change after a failure recovering files from the snapshots that cannot be
566+
// recovered from the source node, in that case we have to start from scratch using the fallback
567+
// recovery plan that would be used in subsequent steps.
563568
final StepListener<Void> sendFileInfoStep = new StepListener<>();
564-
final StepListener<List<StoreFileMetadata>> recoverSnapshotFilesStep = new StepListener<>();
565-
final StepListener<Void> sendFilesStep = new StepListener<>();
566-
final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
567-
final StepListener<Void> cleanFilesStep = new StepListener<>();
569+
final StepListener<Tuple<ShardRecoveryPlan, List<StoreFileMetadata>>> recoverSnapshotFilesStep = new StepListener<>();
570+
final StepListener<ShardRecoveryPlan> sendFilesStep = new StepListener<>();
571+
final StepListener<Tuple<ShardRecoveryPlan, RetentionLease>> createRetentionLeaseStep = new StepListener<>();
572+
final StepListener<ShardRecoveryPlan> cleanFilesStep = new StepListener<>();
568573

569574
final int translogOps = shardRecoveryPlan.getTranslogOps();
570575
recoveryTarget.receiveFileInfo(filesToRecoverNames,
571576
filesToRecoverSizes,
572577
phase1ExistingFileNames,
573578
phase1ExistingFileSizes,
574579
translogOps,
580+
false,
575581
sendFileInfoStep
576582
);
577583

578-
sendFileInfoStep.whenComplete(r -> {
579-
recoverSnapshotFiles(shardRecoveryPlan, recoverSnapshotFilesStep.delegateResponse((delegate, e) -> {
580-
if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false &&
581-
e instanceof CancellableThreads.ExecutionCancelledException == false) {
582-
recoveryTarget.deleteRecoveredFiles(new ActionListener<>() {
583-
@Override
584-
public void onResponse(Void unused) {
585-
recoverFilesFromSourceAndSnapshot(shardRecoveryPlan.getFallbackPlan(), store, stopWatch, listener);
586-
}
584+
sendFileInfoStep.whenComplete(unused -> {
585+
recoverSnapshotFiles(shardRecoveryPlan, new ActionListener<>() {
586+
@Override
587+
public void onResponse(List<StoreFileMetadata> filesFailedToRecoverFromSnapshot) {
588+
recoverSnapshotFilesStep.onResponse(Tuple.tuple(shardRecoveryPlan, filesFailedToRecoverFromSnapshot));
589+
}
587590

588-
@Override
589-
public void onFailure(Exception e) {
590-
listener.onFailure(e);
591-
}
592-
});
593-
} else {
594-
delegate.onFailure(e);
591+
@Override
592+
public void onFailure(Exception e) {
593+
if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false &&
594+
e instanceof CancellableThreads.ExecutionCancelledException == false) {
595+
ShardRecoveryPlan fallbackPlan = shardRecoveryPlan.getFallbackPlan();
596+
recoveryTarget.receiveFileInfo(fallbackPlan.getFilesToRecoverNames(),
597+
fallbackPlan.getFilesToRecoverSizes(),
598+
fallbackPlan.getFilesPresentInTargetNames(),
599+
fallbackPlan.getFilesPresentInTargetSizes(),
600+
fallbackPlan.getTranslogOps(),
601+
true,
602+
recoverSnapshotFilesStep.map(r -> Tuple.tuple(fallbackPlan, Collections.emptyList()))
603+
);
604+
} else {
605+
recoverSnapshotFilesStep.onFailure(e);
606+
}
595607
}
596-
}));
608+
});
597609
}, listener::onFailure);
598610

599-
recoverSnapshotFilesStep.whenComplete(filesFailedToRecoverFromSnapshot -> {
611+
recoverSnapshotFilesStep.whenComplete(planAndFilesFailedToRecoverFromSnapshot -> {
612+
ShardRecoveryPlan recoveryPlan = planAndFilesFailedToRecoverFromSnapshot.v1();
613+
List<StoreFileMetadata> filesFailedToRecoverFromSnapshot = planAndFilesFailedToRecoverFromSnapshot.v2();
600614
final List<StoreFileMetadata> filesToRecoverFromSource;
601615
if (filesFailedToRecoverFromSnapshot.isEmpty()) {
602-
filesToRecoverFromSource = shardRecoveryPlan.getSourceFilesToRecover();
616+
filesToRecoverFromSource = recoveryPlan.getSourceFilesToRecover();
603617
} else {
604-
filesToRecoverFromSource = concatLists(shardRecoveryPlan.getSourceFilesToRecover(), filesFailedToRecoverFromSnapshot);
618+
filesToRecoverFromSource = concatLists(recoveryPlan.getSourceFilesToRecover(), filesFailedToRecoverFromSnapshot);
605619
}
606620

607621
sendFiles(store,
608-
filesToRecoverFromSource.toArray(new StoreFileMetadata[0]), shardRecoveryPlan::getTranslogOps, sendFilesStep);
622+
filesToRecoverFromSource.toArray(new StoreFileMetadata[0]),
623+
recoveryPlan::getTranslogOps,
624+
sendFilesStep.map(unused -> recoveryPlan)
625+
);
609626
}, listener::onFailure);
610627

611-
final long startingSeqNo = shardRecoveryPlan.getStartingSeqNo();
612-
sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);
613-
614-
final Store.MetadataSnapshot recoverySourceMetadata = shardRecoveryPlan.getSourceMetadataSnapshot();
615-
createRetentionLeaseStep.whenComplete(retentionLease ->
616-
{
617-
final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint();
618-
assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint
619-
: retentionLease + " vs " + lastKnownGlobalCheckpoint;
620-
// Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want
621-
// the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica
622-
// to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on
623-
// the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint.
624-
cleanFiles(store, recoverySourceMetadata, () -> translogOps, lastKnownGlobalCheckpoint, cleanFilesStep);
625-
},
626-
listener::onFailure);
628+
sendFilesStep.whenComplete(recoveryPlan -> {
629+
createRetentionLease(recoveryPlan.getStartingSeqNo(),
630+
createRetentionLeaseStep.map(retentionLease -> Tuple.tuple(recoveryPlan, retentionLease))
631+
);
632+
}, listener::onFailure);
633+
634+
createRetentionLeaseStep.whenComplete(recoveryPlanAndRetentionLease -> {
635+
final ShardRecoveryPlan recoveryPlan = recoveryPlanAndRetentionLease.v1();
636+
final RetentionLease retentionLease = recoveryPlanAndRetentionLease.v2();
637+
final Store.MetadataSnapshot recoverySourceMetadata = recoveryPlan.getSourceMetadataSnapshot();
638+
final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint();
639+
assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint
640+
: retentionLease + " vs " + lastKnownGlobalCheckpoint;
641+
// Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want
642+
// the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica
643+
// to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on
644+
// the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint.
645+
cleanFiles(store,
646+
recoverySourceMetadata,
647+
() -> translogOps,
648+
lastKnownGlobalCheckpoint,
649+
cleanFilesStep.map(unused -> recoveryPlan)
650+
);
651+
}, listener::onFailure);
627652

628-
cleanFilesStep.whenComplete(r -> {
653+
cleanFilesStep.whenComplete(recoveryPlan -> {
629654
final TimeValue took = stopWatch.totalTime();
630655
logger.trace("recovery [phase1]: took [{}]", took);
631656
listener.onResponse(
632-
new SendFileResult(filesToRecoverNames, filesToRecoverSizes, totalSize,
633-
phase1ExistingFileNames, phase1ExistingFileSizes, existingTotalSize, took)
657+
new SendFileResult(recoveryPlan.getFilesToRecoverNames(),
658+
recoveryPlan.getFilesToRecoverSizes(),
659+
recoveryPlan.getTotalSize(),
660+
recoveryPlan.getFilesPresentInTargetNames(),
661+
recoveryPlan.getFilesPresentInTargetSizes(),
662+
recoveryPlan.getExistingSize(),
663+
took
664+
)
634665
);
635666
}, listener::onFailure);
636667
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,12 @@ public void receiveFileInfo(List<String> phase1FileNames,
417417
List<String> phase1ExistingFileNames,
418418
List<Long> phase1ExistingFileSizes,
419419
int totalTranslogOps,
420+
boolean deleteRecoveredFiles,
420421
ActionListener<Void> listener) {
421422
ActionListener.completeWith(listener, () -> {
423+
if (deleteRecoveredFiles) {
424+
multiFileWriter.deleteTempFiles();
425+
}
422426
indexShard.resetRecoveryStage();
423427
indexShard.prepareForIndexRecovery();
424428
final RecoveryState.Index index = state().getIndex();
@@ -522,14 +526,6 @@ public void restoreFileFromSnapshot(String repository,
522526
}
523527
}
524528

525-
@Override
526-
public void deleteRecoveredFiles(ActionListener<Void> listener) {
527-
ActionListener.completeWith(listener, () -> {
528-
multiFileWriter.deleteTempFiles();
529-
return null;
530-
});
531-
}
532-
533529
private void registerThrottleTime(long throttleTimeInNanos) {
534530
state().getIndex().addTargetThrottling(throttleTimeInNanos);
535531
indexShard.recoveryStats().addThrottleTime(throttleTimeInNanos);

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ void receiveFileInfo(List<String> phase1FileNames,
8282
List<String> phase1ExistingFileNames,
8383
List<Long> phase1ExistingFileSizes,
8484
int totalTranslogOps,
85+
boolean deleteRecoveredFiles,
8586
ActionListener<Void> listener);
8687

8788
/**
@@ -105,15 +106,6 @@ void restoreFileFromSnapshot(String repository,
105106
BlobStoreIndexShardSnapshot.FileInfo snapshotFile,
106107
ActionListener<Void> listener);
107108

108-
/**
109-
* Deletes all the recovered files so far (partially or completely recovered).
110-
* This is necessary if we're recovering from a snapshot that doesn't share
111-
* the same index files as the source node (i.e. the snapshot was taken before
112-
* a primary fail-over) and the recovery fails half-way. In this case we should
113-
* delete all the recovered files and start from scratch using the source node.
114-
*/
115-
void deleteRecoveredFiles(ActionListener<Void> listener);
116-
117109
/** writes a partial file chunk to the target store */
118110
void writeFileChunk(StoreFileMetadata fileMetadata, long position, ReleasableBytesReference content,
119111
boolean lastChunk, int totalTranslogOps, ActionListener<Void> listener);

0 commit comments

Comments
 (0)