|
65 | 65 | import org.elasticsearch.common.unit.ByteSizeUnit; |
66 | 66 | import org.elasticsearch.common.unit.ByteSizeValue; |
67 | 67 | import org.elasticsearch.common.util.concurrent.AbstractRunnable; |
68 | | -import org.elasticsearch.common.util.concurrent.ListenableFuture; |
69 | 68 | import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; |
70 | 69 | import org.elasticsearch.common.xcontent.NamedXContentRegistry; |
71 | 70 | import org.elasticsearch.common.xcontent.XContentFactory; |
|
112 | 111 | import java.util.Map; |
113 | 112 | import java.util.Set; |
114 | 113 | import java.util.concurrent.Executor; |
115 | | -import java.util.concurrent.ExecutorService; |
116 | 114 | import java.util.concurrent.atomic.AtomicBoolean; |
117 | 115 | import java.util.stream.Collectors; |
118 | 116 | import java.util.stream.Stream; |
@@ -393,78 +391,75 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolea |
393 | 391 | */ |
394 | 392 | private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map<String, BlobContainer> foundIndices, |
395 | 393 | Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData, boolean writeShardGens, |
396 | | - ActionListener<Void> listener) { |
397 | | - |
398 | | - final ExecutorService executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); |
399 | | - |
400 | | - // Listener to invoke once an updated ShardGenerations instance is ready to be written |
401 | | - final ListenableFuture<ShardGenerations> shardGenerationsListener = new ListenableFuture<>(); |
402 | | - |
403 | | - // Listener to invoke once we've written the latest repository data and updated all shard metadata. |
404 | | - // Any failure past this point will only be logged. |
405 | | - final ListenableFuture<Collection<ShardSnapshotMetaDeleteResult>> afterUpdateAllMetadata = new ListenableFuture<>(); |
406 | | - |
407 | | - final ListenableFuture<RepositoryData> repositoryDataWrittenListener = new ListenableFuture<>(); |
| 394 | + ActionListener<Void> listener) throws IOException { |
408 | 395 |
|
409 | 396 | if (writeShardGens) { |
410 | | - // New path that updates the pointer to each deleted shard's generation in the root RepositoryData |
411 | | - |
412 | | - // Once we are done removing the snapshot from the shard-level metadata of each affected shard we can update the repository |
413 | | - // metadata as follows: |
| 397 | + // First write the new shard state metadata (with the removed snapshot) and compute deletion targets |
| 398 | + final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>(); |
| 399 | + writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, true, writeShardMetaDataAndComputeDeletesStep); |
| 400 | + // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows: |
414 | 401 | // 1. Remove the snapshot from the list of existing snapshots |
415 | 402 | // 2. Update the index shard generations of all updated shard folders |
416 | 403 | // |
417 | 404 | // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created |
418 | 405 | // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only |
419 | 406 | // written if all shard paths have been successfully updated. |
420 | | - final StepListener<Collection<ShardSnapshotMetaDeleteResult>> afterUpdateShardMeta = new StepListener<>(); |
421 | | - |
422 | | - writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, true, afterUpdateShardMeta); |
423 | | - |
424 | | - afterUpdateShardMeta.whenComplete(res -> { |
425 | | - final ShardGenerations.Builder builder = ShardGenerations.builder(); |
426 | | - for (ShardSnapshotMetaDeleteResult newGen : res) { |
427 | | - builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); |
428 | | - } |
429 | | - shardGenerationsListener.onResponse(builder.build()); |
430 | | - repositoryDataWrittenListener.addListener(ActionListener.delegateFailure(listener, |
431 | | - (v, l) -> afterUpdateAllMetadata.onResponse(res)), executor); |
| 407 | + final StepListener<RepositoryData> writeUpdatedRepoDataStep = new StepListener<>(); |
| 408 | + writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> { |
| 409 | + final ShardGenerations.Builder builder = ShardGenerations.builder(); |
| 410 | + for (ShardSnapshotMetaDeleteResult newGen : deleteResults) { |
| 411 | + builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); |
| 412 | + } |
| 413 | + final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build()); |
| 414 | + writeIndexGen(updatedRepoData, repositoryStateId, true); |
| 415 | + writeUpdatedRepoDataStep.onResponse(updatedRepoData); |
| 416 | + }, writeUpdatedRepoDataStep::onFailure); |
| 417 | + // Once we have updated the repository, run the clean-ups |
| 418 | + writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> { |
| 419 | + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion |
| 420 | + final ActionListener<Void> afterCleanupsListener = |
| 421 | + new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); |
| 422 | + asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); |
| 423 | + asyncCleanupUnlinkedShardLevelBlobs(snapshotId, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener); |
432 | 424 | }, listener::onFailure); |
433 | 425 | } else { |
434 | | - // Writing the new repository data first, without tracking any shard generations in the BwC path |
435 | | - shardGenerationsListener.onResponse(ShardGenerations.EMPTY); |
436 | | - // We've already written the new RepositoryData so updating all shard-level metadata will mean that all metadata is updated |
437 | | - repositoryDataWrittenListener.addListener(ActionListener.delegateFailure(listener, |
438 | | - (v, l) -> writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, afterUpdateAllMetadata)), |
439 | | - executor); |
| 426 | + // Write the new repository data first (with the removed snapshot), using no shard generations |
| 427 | + final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY); |
| 428 | + writeIndexGen(updatedRepoData, repositoryStateId, false); |
| 429 | + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion |
| 430 | + final ActionListener<Void> afterCleanupsListener = |
| 431 | + new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); |
| 432 | + asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); |
| 433 | + writeUpdatedShardMetaDataAndComputeDeletes(snapshotId, repositoryData, false, ActionListener.map(afterCleanupsListener, |
| 434 | + deleteResults -> { |
| 435 | + asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener); |
| 436 | + return null; |
| 437 | + })); |
440 | 438 | } |
| 439 | + } |
| 440 | + |
| 441 | + private void asyncCleanupUnlinkedRootAndIndicesBlobs(Map<String, BlobContainer> foundIndices, Map<String, BlobMetaData> rootBlobs, |
| 442 | + RepositoryData updatedRepoData, ActionListener<Void> listener) { |
| 443 | + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap( |
| 444 | + listener, |
| 445 | + l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null)))); |
| 446 | + } |
441 | 447 |
|
442 | | - shardGenerationsListener.addListener(ActionListener.wrap(shardGens -> { |
443 | | - final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, shardGens); |
444 | | - // Write out new RepositoryData |
445 | | - writeIndexGen(updatedRepoData, repositoryStateId, writeShardGens); |
446 | | - repositoryDataWrittenListener.onResponse(updatedRepoData); |
447 | | - }, listener::onFailure), executor); |
448 | | - |
449 | | - // Listener that resolves the given listener passed to this method once both cleanup steps have completed. |
450 | | - // Any failures encountered by this listener are ignored as they were already logged by the throwing code. |
451 | | - final ActionListener<Void> afterCleanupsListener = |
452 | | - new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); |
453 | | - |
454 | | - // Run unreferenced blobs cleanup if we were able to write updated repository data |
455 | | - repositoryDataWrittenListener.addListener( |
456 | | - ActionListener.wrap(updatedRepoData -> cleanupStaleBlobs( |
457 | | - foundIndices, rootBlobs, updatedRepoData, ActionListener.map(afterCleanupsListener, ignored -> null)), e -> {}), executor); |
458 | | - |
459 | | - afterUpdateAllMetadata.addListener(ActionListener.runAfter( |
460 | | - ActionListener.wrap( |
461 | | - // Now that all metadata (RepositoryData at the repo root as well as index-N blobs in all shard paths) |
462 | | - // has been updated we can execute the delete operations for all blobs that have become unreferenced as a result |
463 | | - deleteResults -> blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(snapshotId, deleteResults)), |
464 | | - // Any exceptions after we have updated the root level RepositoryData are only logged but won't fail the delete request |
465 | | - e -> logger.warn( |
466 | | - () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e)), |
467 | | - () -> afterCleanupsListener.onResponse(null)), executor); |
| 448 | + private void asyncCleanupUnlinkedShardLevelBlobs(SnapshotId snapshotId, Collection<ShardSnapshotMetaDeleteResult> deleteResults, |
| 449 | + ActionListener<Void> listener) { |
| 450 | + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap( |
| 451 | + listener, |
| 452 | + l -> { |
| 453 | + try { |
| 454 | + blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(snapshotId, deleteResults)); |
| 455 | + l.onResponse(null); |
| 456 | + } catch (Exception e) { |
| 457 | + logger.warn( |
| 458 | + () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), |
| 459 | + e); |
| 460 | + throw e; |
| 461 | + } |
| 462 | + })); |
468 | 463 | } |
469 | 464 |
|
470 | 465 | // updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up. |
|
0 commit comments