Batching of snapshot-delete start updates#141998
Conversation
Today each request to delete a collection of snapshots is inserted into the `SnapshotDeletionsInProgress` in the `ClusterState` with a separate cluster state update. There can be a lot of these things if, say, ILM is re-enabled after being turned off for a while. This commit makes it so that requests received for each repository will be batched together into a single cluster state update instead. Closes ES-14001
|
Hi @DaveCTurner, I've created a changelog YAML for you. |
If the execution of a cluster state update task throws an exception then all the tasks in the batch must be failed with that exception, even if some of them have been marked as successfully completed.
|
This change uncovered a longstanding bug in |
|
I can't find a much easier way to express this for review purposes unfortunately - it's doing quite a complicated job, extracting the process out to a separate class is noisy work, but doing that separately from the change that adds batching wouldn't really make the batching change significantly simpler. If it's any consolation, 60% of the lines added in this PR are the new unit test suite which I think covers everything but also puts us in a better place for adding future tests of this specific component. |
|
Pinging @elastic/es-distributed (Team:Distributed) |
ywangd
left a comment
There was a problem hiding this comment.
This involves quite heavy machinery to get the batching to work. It does look correct to me. But as you commented inline, I wonder how does it compare to reusing MasterService's batch queue and creating tasks with a SubscribableListener<RepositoryData>. We can resubmit the tasks when none of its repositoryData is ready or they are stale. BlobRepository#getRepositoryData already has deduplication which can be leveraged. We still need much of the PR for processing multiple items. But maybe it's overall a bit simpler and less cognitive load?
server/src/main/java/org/elasticsearch/snapshots/SnapshotDeletionStartBatcher.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/snapshots/SnapshotDeletionStartBatcher.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/snapshots/SnapshotDeletionStartBatcher.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/snapshots/SnapshotDeletionStartBatcher.java
Outdated
Show resolved
Hide resolved
| for (var entry : abortedAndNeedingFinalization) { | ||
| snapshotEnder.endSnapshot(entry, metadata, repositoryData); | ||
| } | ||
|
|
||
| if (entryToStart != null) { | ||
| deletionStarter.startDeletion(projectId, repositoryName, entryToStart, repositoryData, maxDataNodeCompatibleIndexVersion); | ||
| } |
There was a problem hiding this comment.
This change might OK. Though in the existing code, the start deletion and finalizing snapshots are mutually exclusive and deletion is preferred. It relies on the state trainsition after each operation to go through all operations. It might be OK to add them explicitly since tryEnterRepoLoop should keep them linearized though I am still not certain about these code for clone.
Maybe we should keep the existing logic or maybe at least run startDeletion before endSnasphot
There was a problem hiding this comment.
I think these two things have to be mutually exclusive - if there is a running snapshot that we've just put into a finalizing state then any new deletion entry will be in state WAITING so entryToStart will be null. I added an assertion to this effect.
There was a problem hiding this comment.
Ok thanks the relationship is quite subtle here. Writing it down here to remind myself. The an entry can only be added to abortedAndNeedingFinalization if it is completed. An completed entry has isWritingToRepository() == true which must make a new deletion entry (if there is one) WAITING.
There was a problem hiding this comment.
👍 are you ok with the assertion I added or do you want some additional comments to record this in the code too?
| } | ||
| }, "delete snapshot [" + repository + "]" + Arrays.toString(snapshotNames), listener::onFailure); | ||
| private void notifyAbortedByDeletion(Snapshot snapshot) { | ||
| if (endingSnapshots.add(snapshot)) { |
There was a problem hiding this comment.
This now runs after publication instead of the part of master task execution. I think it should be fine since these are snapshots directly removed from the cluster state so that it's not possible for them to compete with any other operations. Just writing it down to clear my thoughts.
There was a problem hiding this comment.
Yes this one took a bit of thinking. The way it worked previously was very ugly, we adjusted this externally-visible state here and then tried to revert it if the cluster state update failed, but that was very much an antipattern vs waiting for the cluster state update to be committed before reacting.
| for (var item : batch) { | ||
| assert item.itemCompletionHandler != null; | ||
| item.itemCompletionHandler.onCompletion( | ||
| item.startedDeletionUuid != null ? item.startedDeletionUuid : deletionUuid, |
There was a problem hiding this comment.
Can we get rid of deletionUuid and just rely on startedDeletionUuid? We do need assignment for each item but seems conceptually simpler?
There was a problem hiding this comment.
Mmm not very easily - at the point we're deciding whether each item can subscribe to the existing STARTED deletion or must wait for a new one we don't know what the new one's UUID will be, and then later on we would have to do some slightly strange checks to decide whether each item needs to subscribe to the new deletion or whether it's already bound for some other completion path.
There was a problem hiding this comment.
I was thinking that we "just" replace all batchCompletionHandler.deletionUuid assignments with a method that internally assign it to each item's startedDeletionUuid if it is null. But we can leave it as is if you prefer.
There was a problem hiding this comment.
I think it'd be confusing to have a non-null task.deletionUuid if the task wasn't actually waiting for that deletion (e.g. its listener was DO_NOTHING or COMPLETE_LISTENER_IMMEDIATELY).
I worry that this would end up retaining an awful lot of slightly different (stale and ultimately unused) copies of |
| return; | ||
| } | ||
|
|
||
| repository.getRepositoryData(snapshotExecutor, new ActionListener<>() { |
There was a problem hiding this comment.
If I understand this PR correctly, this expansive async getRepositoryData call is the whole reason why we choose to not use the master queue to handle the batching, right?
Because we need the RepositoryData to compute cluster state when executing the batch, we can't block on the getRepositoryData response within the executor because that could (catastrophically) stall cluster state updates. And based on your response here: #141998 (comment), we also don't want to pass it for "each task in a master batch" because it would consume more memory resources than we need.
There was a problem hiding this comment.
Right - we need to be sure we're resolving the patterns in the request against the actual snapshots in the repository, and although usually the master will hold a copy of the latest RepositoryData in-memory, it may be that we have to read it from the repository and that's not something for which we should block the master thread. Best to spin and try again rather than holding up the queue like that.
Previously this was done with SnapshotsService#executeConsistentStateUpdate but this utility only supports a plain ClusterStateUpdateTask.
| final SnapshotDeletionsItem[] batch; | ||
| final RepositoryMetadata repositoryMetadata; | ||
| final RepositoryData repositoryData; | ||
| // TODO could we discard the repositoryData eagerly if repositoryMetadata changes to release that memory sooner? |
There was a problem hiding this comment.
So detect this case:
after the batch was submitted to the queue but before we execute it? Do you have an idea of how we would practically do that? Via some kind of listener on the batch?There was a problem hiding this comment.
On successful completion of the second cluster state update in BlobStoreRepository#writeIndexGen (the one that says set safe repository generation) we'd want to notify the enqueued batch of the new RepositoryData, without pulling the batch out of the queue. This is the only place where we write a new RepositoryData, and it's happening as part of a cluster state update (e.g. it's on the master thread) so we can be sure that we're not currently processing the batch at the time.
However, there's some subtleties around the failure handling. If the master stops being the master then we do not guarantee to clear out its queue immediately, so a different node could be elected master, update the RepositoryData, and then fail over back to the original master. Since a different node updated the RepositoryData the original master's queue entries wouldn't have been notified about the change. Likewise if either the writeAtomic(OperationPurpose.SNAPSHOT_METADATA, blobContainer(), indexBlob, ... or the following set safe repository generation update fail then we can't know the actual state of the repository without re-reading it.
There was a problem hiding this comment.
Since a different node updated the RepositoryData the original master's queue entries wouldn't have been notified about the change. Likewise if either the writeAtomic(OperationPurpose.SNAPSHOT_METADATA, blobContainer(), indexBlob, ... or the following set safe repository generation update fail then we can't know the actual state of the repository without re-reading it.
This is an issue if we require the notification process to guarantee that every batch executes with the most recent repositoryData, right? If we treat notifications as 'best effort' and invalidate queued batches (e.g., setting their repositoryData field to null) when a change occurs to eagerly discard them, but keep the current check logic, this would be fine? But I guess the performance gains might not be as significant
There was a problem hiding this comment.
Yes that's right, these things are surmountable, the point is that there's more complexity to it than you might first think.
| * removes the last entry from {@link SnapshotsInProgress}. | ||
| */ | ||
| void startDeletion(String[] snapshots, boolean waitForCompletion, TimeValue timeout, ActionListener<Void> listener) { | ||
| logger.trace("startDeletion[{}]", Arrays.toString(snapshots)); |
There was a problem hiding this comment.
Unrelated to this specific PR, but are there established conventions that we follow around logging in the ES repo, ie the amount of internal logs we emit and the level, specifically in the distributed path? Or if it’s more of a 'best judgment' situation (eg don't emit noisy logs on hot paths)?
There was a problem hiding this comment.
We do have a guide for logging best practice that you may want to consult with.
https://github.com/elastic/elasticsearch/blob/main/CONTRIBUTING.md#logging
| return ItemCompletionHandler.COMPLETE_LISTENER_IMMEDIATELY; | ||
| } | ||
| } | ||
| // else add all the item's snapshots to the new deletion, even though there may be duplicates, because the running |
There was a problem hiding this comment.
We may add the snapshotIds to an existing WAITING deletion entry instead of creating a new deletion?
There was a problem hiding this comment.
That's right - I take it you just mean the comment needs to be more precise, no actual code changes needed, right? Comment improved 8151d5d.
| final var updatedDeletionsInProgress = deletionsInProgress.withReplacedEntry( | ||
| entry.withAddedSnapshots(snapshotIdsRequiringCleanup) | ||
| ); |
There was a problem hiding this comment.
withAddedSnapshots(...) return this when nothing is added but withReplacedEntry(...) always returns a new object even when the newEntry is the same object as the old entry. Therefore updatedDeletionsInProgress is always updated which seems unecessary.
There was a problem hiding this comment.
Yes this was my mistaken understanding of snapshotIdsRequiringCleanup too - I made withReplacedEntry do the no-op thing correctly (and added some more assertions therein), see 0e09e6d.
| if (updatedSnapshots == snapshotsInProgress && updatedDeletionsInProgress == deletionsInProgress) { | ||
| // NB copied over from the unbatched implementation, may now already be handled in resolveItemAndAddSnapshotIds? | ||
| assert false : "should be already handled"; // TODO remove this branch if tests are reliably passing | ||
| return initialState; |
There was a problem hiding this comment.
Is this true? We don't check WAITING state in resolveItemAndAddSnapshotIds. It seems possible a first delete request creating a WAITING entry and a second delete request targeting the same snapshots get added to the first deletion entry but is a noop?
There was a problem hiding this comment.
Aha ok yes I was misunderstanding the meaning of snapshotIdsRequiringCleanup - this includes snapshots that are already marked for deletion, it isn't only the new deletion entries. I added a test case covering this code fragment more thoroughly and expanded some comments in 0e09e6d.
| for (var entry : abortedAndNeedingFinalization) { | ||
| snapshotEnder.endSnapshot(entry, metadata, repositoryData); | ||
| } | ||
|
|
||
| if (entryToStart != null) { | ||
| deletionStarter.startDeletion(projectId, repositoryName, entryToStart, repositoryData, maxDataNodeCompatibleIndexVersion); | ||
| } |
There was a problem hiding this comment.
Ok thanks the relationship is quite subtle here. Writing it down here to remind myself. The an entry can only be added to abortedAndNeedingFinalization if it is completed. An completed entry has isWritingToRepository() == true which must make a new deletion entry (if there is one) WAITING.
| for (var item : batch) { | ||
| assert item.itemCompletionHandler != null; | ||
| item.itemCompletionHandler.onCompletion( | ||
| item.startedDeletionUuid != null ? item.startedDeletionUuid : deletionUuid, |
There was a problem hiding this comment.
I was thinking that we "just" replace all batchCompletionHandler.deletionUuid assignments with a method that internally assign it to each item's startedDeletionUuid if it is null. But we can leave it as is if you prefer.
| if (deleteUUID == null) { | ||
| listener.onResponse(null); |
There was a problem hiding this comment.
Can we assert false in this branch? It seems to me that we should always have a non-null UUID for this callback.
There was a problem hiding this comment.
No, this can still be null if the snapshots hadn't really started and were just immediately removed from the cluster state. I added a comment in 6bcaead.
ywangd
left a comment
There was a problem hiding this comment.
LGTM
The tests are quite massive and impressive. Took me a while to read through them. TIL.
| // the listener plumbing is kinda delicate here - done like this to ensure that we verify that failBatch calls runQueueProcessor | ||
| final var deletionMissingFuture = new SubscribableListener<Void>(); | ||
| final var deletionFailFuture = new SubscribableListener<Void>(); | ||
| deletionFailFuture.addListener(ActionTestUtils.assertNoSuccessListener(ignoredException -> { | ||
| repository.repositoryDataListener = SubscribableListener.newSucceeded(RepositoryData.EMPTY); | ||
| batcher.startDeletion( | ||
| new String[] { randomSnapshotName() }, | ||
| true, | ||
| MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, | ||
| deletionMissingFuture | ||
| ); |
There was a problem hiding this comment.
Would be nice if we could assert that runQueueProcessor is not triggered after startDeletion to double confirm it is calledd by failBatch. Doesn't seem easy though.
There was a problem hiding this comment.
Yes I didn't find this test very satisfactory. I looked at it again with a fresh mind and rewrote it in 792d1a5 to fail at the master-publication stage instead, which has the same effect (we call failBatch either way).
| for (int i = 0; i < iterations; i++) { | ||
| startDeletion(snapshotName); |
There was a problem hiding this comment.
It seems unecessary to log all names even when they are identical. I guess it matters only for tests since we don't really expect such usages in production?
There was a problem hiding this comment.
Yeah it didn't really seem worth tracking duplicates and worrying about memory usage for that; today because it's unbatched we would log each of these in a separate log line because there's not even a concept of "duplicate". Also in production I think it'd be interesting if we were sometimes seeing lots of duplicates here. Something to come back to if it turns out to be an issue but I expect it won't.
This test fails if it only has a single snapshot thread and that thread gets blocked on the repository before the second snapshot has been enqueued: since elastic#141998 landed, enqueuing batches of deletes requires a snapshot thread. This commit fixes the problem by adding a second snapshot thread. Closes elastic#143387
This test fails if it only has a single snapshot thread and that thread gets blocked on the repository before the second snapshot has been enqueued: since #141998 landed, enqueuing batches of deletes requires a snapshot thread. This commit fixes the problem by adding a second snapshot thread. Closes #143387
This test fails if it only has a single snapshot thread and that thread gets blocked on the repository before the second snapshot has been enqueued: since elastic#141998 landed, enqueuing batches of deletes requires a snapshot thread. This commit fixes the problem by adding a second snapshot thread. Closes elastic#143387
Today each request to delete a collection of snapshots is inserted into
the
SnapshotDeletionsInProgressin theClusterStatewith a separatecluster state update. There can be a lot of these things if, say, ILM is
re-enabled after being turned off for a while. This commit makes it so
that requests received for each repository will be batched together into
a single cluster state update instead.
Closes ES-14001