Limit size of shardDeleteResults#133558
Conversation
Modifies `BlobStoreRepository.ShardBlobsToDelete.shardDeleteResults` to have a variable size depending on the remaining heap space rather than a hard-coded 2GB size which caused smaller nodes with less heap space to OOMe. Relates to elastic#131822 Closes ES-12540
…-1/elasticsearch into limit-shard-blobs-to-delete
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
Outdated
Show resolved
Hide resolved
Modifies `addShardDeleteResult` to only write to `shardDeleteResults` when there is capacity for the write
server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java
Outdated
Show resolved
Hide resolved
BlobStoreRepository
…ard-blobs-to-delete
…-1/elasticsearch into limit-shard-blobs-to-delete
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
Outdated
Show resolved
Hide resolved
| // We only want to read this shard delete result if we were able to write the entire object. | ||
| // Otherwise, for partial writes, an EOFException will be thrown upon reading | ||
| if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) { | ||
| successfullyWrittenBlobsCount += 1; |
There was a problem hiding this comment.
This replaces resultCount but it's the count of the number of successfully recorded shards not blobs.
| if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) { | ||
| successfullyWrittenBlobsCount += 1; | ||
| } else { | ||
| leakedBlobsCount += 1; |
There was a problem hiding this comment.
Likewise this is recording the number of shards with leaked blobs rather than the number of leaked blobs. However, rather than just renaming the variable I think we should actually count the number of leaked blobs (i.e. += blobsToDelete.size() here).
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
Show resolved
Hide resolved
DaveCTurner
left a comment
There was a problem hiding this comment.
Good stuff, I left only tiny nits about the production code and a few other comments about the testing.
| ClusterSettings clusterSettings = clusterService.getClusterSettings(); | ||
| clusterSettings.initializeAndWatch( | ||
| MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING, | ||
| status -> this.maxHeapSizeForSnapshotDeletion = status |
There was a problem hiding this comment.
Maybe apply the limit here on write (making the field an int) rather than each read?
| status -> this.maxHeapSizeForSnapshotDeletion = status | |
| maxHeapSizeForSnapshotDeletion -> this.maxHeapSizeForSnapshotDeletion = Math.toIntExact( | |
| Math.min(maxHeapSizeForSnapshotDeletion.getBytes(), Integer.MAX_VALUE - ByteSizeUnit.MB.toBytes(1)) | |
| ) |
| boolean writeTruncated = false; | ||
| // There is a minimum of 1 byte available for writing | ||
| if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) { | ||
| new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed); | ||
| // We only want to read this shard delete result if we were able to write the entire object. | ||
| // Otherwise, for partial writes, an EOFException will be thrown upon reading | ||
| if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) { | ||
| resultsCount += 1; | ||
| } else { | ||
| writeTruncated = true; | ||
| } | ||
| } else { | ||
| writeTruncated = true; | ||
| } |
There was a problem hiding this comment.
A matter of taste, but consider extracting this section into its own method to clarify that we only get false on the branch that succeeded, all other paths lead to true (and maybe we should invert that so that true means "success").
private boolean writeBlobsIfCapacity(IndexId indexId, int shardId, Collection<String> blobsToDelete) throws IOException {
// There is a minimum of 1 byte available for writing
if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) {
new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
// We only want to read this shard delete result if we were able to write the entire object.
// Otherwise, for partial writes, an EOFException will be thrown upon reading
if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) {
resultsCount += 1;
return false;
}
}
return true;
}| assertEquals(expectedShardGenerations.build(), shardBlobsToDelete.getUpdatedShardGenerations()); | ||
| shardBlobsToDelete.getBlobPaths().forEachRemaining(s -> assertTrue(expectedBlobsToDelete.remove(s))); | ||
| assertThat(expectedBlobsToDelete, empty()); | ||
| assertThat(shardBlobsToDelete.sizeInBytes(), lessThanOrEqualTo(Math.max(ByteSizeUnit.KB.toIntBytes(1), 20 * blobCount))); |
There was a problem hiding this comment.
Hmm it's not really within the implied contract of this class to be able to iterate its contents and then try and append more items. We've already closed the underlying compressed stream by this point. I think we should defer these assertions until the end.
|
|
||
| // === Second, now capacity is exceeded, test whether subsequent writes are accepted without throwing an error === // | ||
|
|
||
| for (int i = 0; i < randomIntBetween(1, 20); i++) { |
There was a problem hiding this comment.
This is trappy, it's going to generate a new randomIntBetween on each iteration so you don't get a uniform distribution of values. Better to count down from between(1,20) to zero, or else extract a variable for the upper bound.
| // === First, write blobs until capacity is exceeded === // | ||
|
|
||
| // While there is at least one byte in the stream, write | ||
| while (shardBlobsToDelete.sizeInBytes() < heapMemory) { |
There was a problem hiding this comment.
WDYT about iterating until leakedBlobCount reaches some target value here, rather than doing the two separate loops?
| final var indexId = new IndexId(randomIdentifier(), randomUUID()); | ||
| final var shardId = between(1, 30); | ||
| final var shardGeneration = new ShardGeneration(randomUUID()); | ||
| // Always write at least one blob, guaranteeing that the shardDeleteResults stream increases in size |
There was a problem hiding this comment.
👍 well spotted (in fact the stream grows anyway because we write the index ID, but that doesn't necessarily increase leakedBlobCount)
Modifies `BlobStoreRepository.ShardBlobsToDelete.shardDeleteResults` to have a variable size depending on the remaining heap space rather than a hard-coded 2GB size which caused smaller nodes with less heap space to OOMe. Relates to elastic#131822 Closes ES-12540
Modifies `BlobStoreRepository.ShardBlobsToDelete.shardDeleteResults` to have a variable size depending on the remaining heap space rather than a hard-coded 2GB size which caused smaller nodes with less heap space to OOMe. Relates to elastic#131822 Closes ES-12540
In elastic#133558 we imposed a limit on the heap used to keep track of shard-level blobs to clean up after the commit of a snapshot deletion. This commit makes use of the same mechanism to track `IndexMetadata` blobs for future deletion. Closes elastic#140018
In elastic#133558 we imposed a limit on the heap used to keep track of shard-level blobs to clean up after the commit of a snapshot deletion. This commit makes use of the same mechanism to track `IndexMetadata` blobs for future deletion. Closes elastic#140018
Modifies
BlobStoreRepository.ShardBlobsToDelete.shardDeleteResultsto have a variable size depending on the remaining heap space rather than a hard-coded 2GB size which caused smaller nodes with less heap space to OOMe.Relates to #131822
Closes #116379
Closes ES-12540