Skip to content

Commit 1bdbf2c

Browse files
CR: Yannicks patch
1 parent 5295322 commit 1bdbf2c

1 file changed

Lines changed: 57 additions & 62 deletions

File tree

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 57 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import org.elasticsearch.common.unit.ByteSizeUnit;
6666
import org.elasticsearch.common.unit.ByteSizeValue;
6767
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
68-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
6968
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
7069
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
7170
import org.elasticsearch.common.xcontent.XContentFactory;
@@ -112,7 +111,6 @@
112111
import java.util.Map;
113112
import java.util.Set;
114113
import java.util.concurrent.Executor;
115-
import java.util.concurrent.ExecutorService;
116114
import java.util.concurrent.atomic.AtomicBoolean;
117115
import java.util.stream.Collectors;
118116
import java.util.stream.Stream;
@@ -393,78 +391,75 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea
393391
*/
394392
private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map<String, BlobContainer> foundIndices,
395393
Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData, boolean writeShardGens,
396-
ActionListener<Void> listener) {
397-
398-
final ExecutorService executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
399-
400-
// Listener to invoke once an updated ShardGenerations instance is ready to be written
401-
final ListenableFuture<ShardGenerations> shardGenerationsListener = new ListenableFuture<>();
402-
403-
// Listener to invoke once we've written the latest repository data and updated all shard metadata.
404-
// Any failure past this point will only be logged.
405-
final ListenableFuture<Collection<ShardSnapshotMetaDeleteResult>> afterUpdateAllMetadata = new ListenableFuture<>();
406-
407-
final ListenableFuture<RepositoryData> repositoryDataWrittenListener = new ListenableFuture<>();
394+
ActionListener<Void> listener) throws IOException {
408395

409396
if (writeShardGens) {
410-
// New path that updates the pointer to each deleted shard's generation in the root RepositoryData
411-
412-
// Once we are done removing the snapshot from the shard-level metadata of each affected shard we can update the repository
413-
// metadata as follows:
397+
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
398+
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
399+
writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, true, writeShardMetaDataAndComputeDeletesStep);
400+
// Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
414401
// 1. Remove the snapshot from the list of existing snapshots
415402
// 2. Update the index shard generations of all updated shard folders
416403
//
417404
// Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created
418405
// index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only
419406
// written if all shard paths have been successfully updated.
420-
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> afterUpdateShardMeta = new StepListener<>();
421-
422-
writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, true, afterUpdateShardMeta);
423-
424-
afterUpdateShardMeta.whenComplete(res -> {
425-
final ShardGenerations.Builder builder = ShardGenerations.builder();
426-
for (ShardSnapshotMetaDeleteResult newGen : res) {
427-
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
428-
}
429-
shardGenerationsListener.onResponse(builder.build());
430-
repositoryDataWrittenListener.addListener(ActionListener.delegateFailure(listener,
431-
(v, l) -> afterUpdateAllMetadata.onResponse(res)), executor);
407+
final StepListener<RepositoryData> writeUpdatedRepoDataStep = new StepListener<>();
408+
writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> {
409+
final ShardGenerations.Builder builder = ShardGenerations.builder();
410+
for (ShardSnapshotMetaDeleteResult newGen : deleteResults) {
411+
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
412+
}
413+
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
414+
writeIndexGen(updatedRepoData, repositoryStateId, true);
415+
writeUpdatedRepoDataStep.onResponse(updatedRepoData);
416+
}, writeUpdatedRepoDataStep::onFailure);
417+
// Once we have updated the repository, run the clean-ups
418+
writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
419+
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
420+
final ActionListener<Void> afterCleanupsListener =
421+
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
422+
asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
423+
asyncCleanupUnlinkedShardLevelBlobs(snapshotId, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener);
432424
}, listener::onFailure);
433425
} else {
434-
// Writing the new repository data first, without tracking any shard generations in the BwC path
435-
shardGenerationsListener.onResponse(ShardGenerations.EMPTY);
436-
// We've already written the new RepositoryData so updating all shard-level metadata will mean that all metadata is updated
437-
repositoryDataWrittenListener.addListener(ActionListener.delegateFailure(listener,
438-
(v, l) -> writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, afterUpdateAllMetadata)),
439-
executor);
426+
// Write the new repository data first (with the removed snapshot), using no shard generations
427+
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY);
428+
writeIndexGen(updatedRepoData, repositoryStateId, false);
429+
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
430+
final ActionListener<Void> afterCleanupsListener =
431+
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
432+
asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
433+
writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, ActionListener.map(afterCleanupsListener,
434+
deleteResults -> {
435+
asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener);
436+
return null;
437+
}));
440438
}
439+
}
440+
441+
private void asyncCleanupUnlinkedRootAndIndicesBlobs(Map<String, BlobContainer> foundIndices, Map<String, BlobMetaData> rootBlobs,
442+
RepositoryData updatedRepoData, ActionListener<Void> listener) {
443+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
444+
listener,
445+
l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null))));
446+
}
441447

442-
shardGenerationsListener.addListener(ActionListener.wrap(shardGens -> {
443-
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, shardGens);
444-
// Write out new RepositoryData
445-
writeIndexGen(updatedRepoData, repositoryStateId, writeShardGens);
446-
repositoryDataWrittenListener.onResponse(updatedRepoData);
447-
}, listener::onFailure), executor);
448-
449-
// Listener that resolves the given listener passed to this method once both cleanup steps have completed.
450-
// Any failures encountered by this listener are ignored as they were already logged by the throwing code.
451-
final ActionListener<Void> afterCleanupsListener =
452-
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
453-
454-
// Run unreferenced blobs cleanup if we were able to write updated repository data
455-
repositoryDataWrittenListener.addListener(
456-
ActionListener.wrap(updatedRepoData -> cleanupStaleBlobs(
457-
foundIndices, rootBlobs, updatedRepoData, ActionListener.map(afterCleanupsListener, ignored -> null)), e -> {}), executor);
458-
459-
afterUpdateAllMetadata.addListener(ActionListener.runAfter(
460-
ActionListener.wrap(
461-
// Now that all metadata (RepositoryData at the repo root as well as index-N blobs in all shard paths)
462-
// has been updated we can execute the delete operations for all blobs that have become unreferenced as a result
463-
deleteResults -> blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(snapshotId, deleteResults)),
464-
// Any exceptions after we have updated the root level RepositoryData are only logged but won't fail the delete request
465-
e -> logger.warn(
466-
() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e)),
467-
() -> afterCleanupsListener.onResponse(null)), executor);
448+
private void asyncCleanupUnlinkedShardLevelBlobs(SnapshotId snapshotId, Collection<ShardSnapshotMetaDeleteResult> deleteResults,
449+
ActionListener<Void> listener) {
450+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
451+
listener,
452+
l -> {
453+
try {
454+
blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(snapshotId, deleteResults));
455+
l.onResponse(null);
456+
} catch (Exception e) {
457+
logger.warn(
458+
() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId),
459+
e);
460+
throw e;
461+
}
462+
}));
468463
}
469464

470465
// updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.

0 commit comments

Comments
 (0)