Skip to content

Commit 6966fa5

Browse files
author
Ali Beyad
committed
Ensures cleanup of temporary index-* generational blobs during snapshotting (#21469)
Ensures pending index-* blobs are deleted when snapshotting. The index-* blobs are generational files that maintain the snapshots in the repository. To write these atomically, we first write a `pending-index-*` blob, then move it to `index-*`, which also deletes `pending-index-*` in case its not a file-system level move (e.g. S3 repositories) . For example, to write the 5th generation of the index blob for the repository, we would first write the bytes to `pending-index-5` and then move `pending-index-5` to `index-5`. It is possible that we fail after writing `pending-index-5`, but before moving it to `index-5` or deleting `pending-index-5`. In this case, we will have a dangling `pending-index-5` blob laying around. Since snapshot #5 would have failed, the next snapshot assumes a generation number of 5, so it tries to write to `index-5`, which first tries to write to `pending-index-5` before moving the blob to `index-5`. Since `pending-index-5` is leftover from the previous failure, the snapshot fails as it cannot overwrite this blob. This commit solves the problem by first, adding a UUID to the `pending-index-*` blobs, and secondly, strengthen the logic around failure to write the `index-*` generational blob to ensure pending files are deleted on cleanup. Closes #21462
1 parent f91c8d4 commit 6966fa5

4 files changed

Lines changed: 68 additions & 8 deletions

File tree

core/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,11 @@ public interface BlobContainer {
105105
Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException;
106106

107107
/**
108-
* Atomically renames the source blob into the target blob. If the source blob does not exist or the
109-
* target blob already exists, an exception is thrown.
108+
* Renames the source blob into the target blob. If the source blob does not exist or the
109+
* target blob already exists, an exception is thrown. Atomicity of the move operation
110+
* can only be guaranteed on an implementation-by-implementation basis. The only current
111+
* implementation of {@link BlobContainer} for which atomicity can be guaranteed is the
112+
* {@link org.elasticsearch.common.blobstore.fs.FsBlobContainer}.
110113
*
111114
* @param sourceBlobName
112115
* The blob to rename.

core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -867,15 +867,17 @@ private long listBlobsToGetLatestIndexId() throws IOException {
867867
}
868868

869869
private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException {
870-
final String tempBlobName = "pending-" + blobName;
870+
final String tempBlobName = "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
871871
try (InputStream stream = bytesRef.streamInput()) {
872872
snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length());
873-
}
874-
try {
875873
snapshotsBlobContainer.move(tempBlobName, blobName);
876874
} catch (IOException ex) {
877-
// Move failed - try cleaning up
878-
snapshotsBlobContainer.deleteBlob(tempBlobName);
875+
// temporary blob creation or move failed - try cleaning up
876+
try {
877+
snapshotsBlobContainer.deleteBlob(tempBlobName);
878+
} catch (IOException e) {
879+
ex.addSuppressed(e);
880+
}
879881
throw ex;
880882
}
881883
}

core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2672,4 +2672,53 @@ public void testSnapshotCanceledOnRemovedShard() throws Exception {
26722672
assertEquals("IndexShardSnapshotFailedException[Aborted]", snapshotInfo.shardFailures().get(0).reason());
26732673
}
26742674

2675+
public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception {
2676+
logger.info("--> creating repository");
2677+
final Path repoPath = randomRepoPath();
2678+
assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("mock").setVerify(false).setSettings(
2679+
Settings.builder().put("location", repoPath).put("random_control_io_exception_rate", randomIntBetween(5, 20) / 100f)));
2680+
2681+
logger.info("--> indexing some data");
2682+
createIndex("test-idx");
2683+
ensureGreen();
2684+
final int numDocs = randomIntBetween(1, 5);
2685+
for (int i = 0; i < numDocs; i++) {
2686+
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
2687+
}
2688+
refresh();
2689+
assertThat(client().prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo((long) numDocs));
2690+
2691+
logger.info("--> snapshot with potential I/O failures");
2692+
try {
2693+
CreateSnapshotResponse createSnapshotResponse =
2694+
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
2695+
.setWaitForCompletion(true)
2696+
.setIndices("test-idx")
2697+
.get();
2698+
if (createSnapshotResponse.getSnapshotInfo().totalShards() != createSnapshotResponse.getSnapshotInfo().successfulShards()) {
2699+
assertThat(getFailureCount("test-repo"), greaterThan(0L));
2700+
assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), greaterThan(0));
2701+
for (SnapshotShardFailure shardFailure : createSnapshotResponse.getSnapshotInfo().shardFailures()) {
2702+
assertThat(shardFailure.reason(), containsString("Random IOException"));
2703+
}
2704+
}
2705+
} catch (Exception ex) {
2706+
// sometimes, the snapshot will fail with a top level I/O exception
2707+
assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException"));
2708+
}
2709+
2710+
logger.info("--> snapshot with no I/O failures");
2711+
assertAcked(client().admin().cluster().preparePutRepository("test-repo-2").setType("mock").setVerify(false).setSettings(
2712+
Settings.builder().put("location", repoPath)));
2713+
CreateSnapshotResponse createSnapshotResponse =
2714+
client().admin().cluster().prepareCreateSnapshot("test-repo-2", "test-snap-2")
2715+
.setWaitForCompletion(true)
2716+
.setIndices("test-idx")
2717+
.get();
2718+
assertEquals(0, createSnapshotResponse.getSnapshotInfo().failedShards());
2719+
GetSnapshotsResponse getSnapshotsResponse = client().admin().cluster().prepareGetSnapshots("test-repo-2")
2720+
.addSnapshots("test-snap-2").get();
2721+
assertEquals(SnapshotState.SUCCESS, getSnapshotsResponse.getSnapshots().get(0).state());
2722+
}
2723+
26752724
}

core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,14 +321,20 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws
321321

322322
@Override
323323
public void move(String sourceBlob, String targetBlob) throws IOException {
324+
// simulate a non-atomic move, since many blob container implementations
325+
// will not have an atomic move, and we should be able to handle that
324326
maybeIOExceptionOrBlock(targetBlob);
325-
super.move(sourceBlob, targetBlob);
327+
super.writeBlob(targetBlob, super.readBlob(sourceBlob), 0L);
328+
super.deleteBlob(sourceBlob);
326329
}
327330

328331
@Override
329332
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
330333
maybeIOExceptionOrBlock(blobName);
331334
super.writeBlob(blobName, inputStream, blobSize);
335+
// for network based repositories, the blob may have been written but we may still
336+
// get an error with the client connection, so an IOException here simulates this
337+
maybeIOExceptionOrBlock(blobName);
332338
}
333339
}
334340
}

0 commit comments

Comments
 (0)