Add support for peer recoveries using snapshots after primary failovers#77420
Add support for peer recoveries using snapshots after primary failovers#77420fcofdez merged 31 commits intoelastic:masterfrom
Conversation
|
Pinging @elastic/es-distributed (Team:Distributed) |
This commit adds support for peer recoveries using snapshots after a primary failover if the snapshot shares the same logical contents but the phyisical files are different. It uses the seq no information stored in the snapshot to compare against the current shard source node seq nos and decide whether or not it can use the snapshot to recover the shard. Since the underlying index files are different to the source index files, error handling is different than when the files are shared. In this case, if there's an error while snapshots files are recovered, we have to cancel the on-going downloads, wait until all in-flight operations complete, remove the recovered files and start from scratch using a fallback recovery plan that uses the files from the source node. Relates elastic#73496
3a05364 to
fdd32d4
Compare
| Store.RecoveryDiff recoveryDiff = latestSnapshot.getMetadataSnapshot().recoveryDiff(sourceMetadata); | ||
|
|
||
| // Primary failed over after the snapshot was taken | ||
| if (recoveryDiff.different.isEmpty() == false || recoveryDiff.missing.isEmpty() == false) { |
There was a problem hiding this comment.
I wonder if we should do some version checks here, since we're not relying on file contents being equal as we do in the regular path.
There was a problem hiding this comment.
Good point. AFAICS we do not have that info currently. I wonder if we should add Version.CURRENT to commit user data when committing. But it does not get us the full way, we will need a way to represent that in the snapshot too. Let us touch base on that tomorrow.
henningandersen
left a comment
There was a problem hiding this comment.
I reviewed the production part of the PR and have a few initial comments.
| recoverSnapshotFiles(shardRecoveryPlan, recoverSnapshotFilesStep.delegateResponse((delegate, e) -> { | ||
| if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false && | ||
| e instanceof CancellableThreads.ExecutionCancelledException == false) { | ||
| recoveryTarget.deleteRecoveredFiles(new ActionListener<>() { |
There was a problem hiding this comment.
I wonder if we need this explicit call to the target? I think we could just clean this up when the target receives the first chunk of the "normal" file based recovery? The boolean flag shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() could be sent in the receiveFileInfo call.
|
|
||
| ShardSnapshot latestSnapshot = latestSnapshotOpt.get(); | ||
|
|
||
| if (latestSnapshot.isLogicallyEquivalent(shardStateIdentifier)) { |
There was a problem hiding this comment.
I wonder if we should only allow this case when sourceTargetDiff.identical.isEmpty()? The primary case we are after here is relocations, which will be to a node that does not have the shard at all.
It is unlikely to make a big difference due to retention leases, which ensure that for normal restart cases we do not do any file based recovery at all. But if we find identical files between source and target, it is a trade-off whether it is worth recovering from snapshot. In the interest of this change being a "purely positive" change, I would prefer that we skip recovery from snapshot in that case. Let me know your thoughts on this.
There was a problem hiding this comment.
That makes sense and I think that it simplifies reasoning about this edge case too. I'll change it 👍
| Store.RecoveryDiff recoveryDiff = latestSnapshot.getMetadataSnapshot().recoveryDiff(sourceMetadata); | ||
|
|
||
| // Primary failed over after the snapshot was taken | ||
| if (recoveryDiff.different.isEmpty() == false || recoveryDiff.missing.isEmpty() == false) { |
There was a problem hiding this comment.
Good point. AFAICS we do not have that info currently. I wonder if we should add Version.CURRENT to commit user data when committing. But it does not get us the full way, we will need a way to represent that in the snapshot too. Let us touch base on that tomorrow.
| recoveryTarget.deleteRecoveredFiles(new ActionListener<>() { | ||
| @Override | ||
| public void onResponse(Void unused) { | ||
| recoverFilesFromSourceAndSnapshot(shardRecoveryPlan.getFallbackPlan(), store, stopWatch, listener); |
There was a problem hiding this comment.
It is a bit worrying that we abandon the rest of the original recoverFilesFromSourceAndSnapshot, i.e., the steps that have registered for whenComplete are never completed. I think it works now, but if we ever expect those to be fired to free some resource it could become a subtle leak.
I think I would prefer to just send the file infos again and then continue on the original steps, seems less confusing. Let me know what you think.
henningandersen
left a comment
There was a problem hiding this comment.
Thanks Francisco, this direction looks good. I left a number of smaller comments for the production code, did not get through tests yet.
| } | ||
|
|
||
| @Nullable | ||
| public Version getVersion() { |
There was a problem hiding this comment.
Perhaps rename this to getCommitVersion() to be sure not to confuse it with some of the other versions involved in snapshots.
We could also move this to Store.MetadataSnapshot, looks like we might as well construct that in the constructor with the lucene commit data. It seems like a slightly more logical place for this method to live? We can also look at that in a follow-up if you prefer.
| // Primary failed over after the snapshot was taken | ||
| if (latestSnapshot.isLogicallyEquivalent(shardStateIdentifier) && | ||
| latestSnapshot.hasDifferentPhysicalFiles(sourceMetadata) && | ||
| snapshotVersion != null && snapshotVersion.onOrBefore(Version.CURRENT) && |
There was a problem hiding this comment.
Let us add a comment here that we check against source version and why it is OK (the allocation decider).
Also, I think we can do following instead:
| snapshotVersion != null && snapshotVersion.onOrBefore(Version.CURRENT) && | |
| (snapshotVersion == null || snapshotVersion.onOrBefore(Version.CURRENT)) && |
and add a comment that no snapshotCommitVersion means that it was taken before 7.16? That would make this new path work from 7.16 with snapshots taken in 7.15-.
| // Primary failed over after the snapshot was taken | ||
| if (latestSnapshot.isLogicallyEquivalent(shardStateIdentifier) && | ||
| latestSnapshot.hasDifferentPhysicalFiles(sourceMetadata) && | ||
| snapshotVersion != null && snapshotVersion.onOrBefore(Version.CURRENT) && |
There was a problem hiding this comment.
I wonder if we should also assert on the lucene version also being ok? For the snapshotVersion == null case we could consider checking that the lucene version is before the expected version.
| commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID); | ||
| } | ||
| commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); | ||
| commitData.put(ES_VERSION, Version.CURRENT.toString()); |
There was a problem hiding this comment.
I think we need to add this in a few other places like Store.trimUnsafeCommits and StoreRecovery.addIndices?
Can we perhaps add an assertion about the previous version being before or on current version?
And maybe even add an assertion about this being filled in if lucene version is new enough? We can at least keep that in master though we may have to remove it in 7.x depending on whether a lucene version upgrade happens for the release where this lands.
There was a problem hiding this comment.
I did not see the two assertions added, perhaps there were issues doing so? If so, please let me know.
There was a problem hiding this comment.
Sorry I missed that comment.
Can we perhaps add an assertion about the previous version being before or on current version?
Do you mean asserting that the commit user data that doesn't contain the ES_VERSION value is in a version < 7.16? @henningandersen
There was a problem hiding this comment.
My intention with that specific comment was that if there is a version, it must be onOrBefore current version. I.e., the version can only go forward never backwards. This will be verified a little bit once we do some rolling upgrade tests between 7.x and 8.0 (i.e., after merge to both). And obviously as we add releases this check will be more valuable. Just want it to be expressed in an assertion that it is monotonically increasing.
There was a problem hiding this comment.
I think we also need to update the version in RemoveCorruptedShardCommand.addNewHistoryCommit and Store.associateIndexWithNewTranslog. About the latter, I wonder if we should just let Store.updateCommitData set the ES version, seems fair to centralize that and would ensure new users of that also update the version.
server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
Outdated
Show resolved
Hide resolved
| this.shardSnapshotInfo = shardSnapshotInfo; | ||
| this.snapshotFiles = snapshotFiles.stream() | ||
| .collect(Collectors.toMap(snapshotFile -> snapshotFile.metadata().name(), Function.identity())); | ||
| this.metadataSnapshot = convertToMetadataSnapshot(snapshotFiles); |
There was a problem hiding this comment.
We might as well pass in the luceneCommitUserData to convertToMetadataSnapshot now that we have it? Makes the metadata snapshot more complete.
| .collect(Collectors.toMap(StoreFileMetadata::name, Function.identity())); | ||
|
|
||
| InMemoryDirectory directory = new InMemoryDirectory(snapshotFiles); | ||
| SegmentInfos segmentCommitInfos = Lucene.readSegmentInfos(directory); |
There was a problem hiding this comment.
In the (albeit edge) case that this runs on an older version than the replica and the snapshot is taken on the newer version, this no longer allows file-comparison based recover from snapshot. I think that is OK, but also that it deserves a comment here on this and why it is unlikely to be an issue.
There was a problem hiding this comment.
Maybe we can extract that logic into a method and return an empty map if we fail to load the commit info?
There was a problem hiding this comment.
I think we would need to be able to distinguish the case where we can read the lucene commit but there is no version from the case where we cannot read the lucene commit, at least if we follow my proposal here
| return clusterService.state().nodes().getMinNodeVersion().onOrAfter(SNAPSHOT_RECOVERIES_SUPPORTED_VERSION); | ||
| } | ||
|
|
||
| private static final class InMemoryDirectory extends BaseDirectory { |
There was a problem hiding this comment.
Maybe rename to StoreFileMetadataDirectory?
|
|
||
| @Override | ||
| public void close() { | ||
| // no-op |
There was a problem hiding this comment.
I wonder if this should set isOpen=false? But I also wonder if we should not just extend Directory rather than BaseDirectory, the locking in BaseDirectory seems unnecessary here? None of this is really important though.
|
I think I've covered all the review comments and this should be ready for another review round. |
henningandersen
left a comment
There was a problem hiding this comment.
I added a few more comments, still need to run through the tests, but will not get to that today.
| public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no"; | ||
| public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; | ||
| // Field name that stores the Elasticsearch version in Lucene commit user data, representing | ||
| // the version that was used to write those Lucene segments |
There was a problem hiding this comment.
| // the version that was used to write those Lucene segments | |
| // the version that was used to write the commit (and thus a max version for the underlying segments). |
| writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet()); | ||
| final Map<String, String> userData = startingIndexCommit.getUserData(); | ||
| writer.setLiveCommitData(() -> { | ||
| if (userData.containsKey(ES_VERSION)) { |
There was a problem hiding this comment.
I think this is questionable. If we trim the commit, we store it by the new ES version, which could be on a newer lucene version as well. With this, I think we risk seeing an ES version for an older release but in a commit that is a new lucene version. I think we should avoid that and simply always put in the current ES version. I also think trying to optimize this by not rebuilding the hashmap here is not worth it, this is not performance sensitive code.
| commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID); | ||
| } | ||
| commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); | ||
| commitData.put(ES_VERSION, Version.CURRENT.toString()); |
There was a problem hiding this comment.
I did not see the two assertions added, perhaps there were issues doing so? If so, please let me know.
| fallbackPlan.getTranslogOps(), | ||
| true, | ||
| recoverSnapshotFilesStep.map(r -> Tuple.tuple(fallbackPlan, Collections.emptyList())) | ||
| recoverSnapshotFilesStep.map(r -> Tuple.tuple(fallbackPlan, Collections.emptyList())) |
There was a problem hiding this comment.
Is there an indentation issue here?
| // if the snapshotVersion == null that means that the snapshot was taken in a version <= 7.15, | ||
| // therefore we can safely use that snapshot. | ||
| if (commitVersion == null) { | ||
| return Version.CURRENT.luceneVersion.onOrAfter(snapshot.getCommitLuceneVersion()); |
There was a problem hiding this comment.
Can we add an assert about this too?
| return Version.CURRENT.luceneVersion.onOrAfter(snapshot.getCommitLuceneVersion()); | |
| assert Version.V_7_16_0.luceneVersion.onOrAfter(snapshot.getCommitLuceneVersion()); | |
| return Version.CURRENT.luceneVersion.onOrAfter(snapshot.getCommitLuceneVersion()); |
| ); | ||
| } | ||
|
|
||
| private boolean isSnapshotVersionCompatible(ShardSnapshot snapshot) { |
There was a problem hiding this comment.
Can we add a comment that this runs on the primary, but using it's version is fine, since NodeVersionAllocationDecider ensures we only recover to a node that has newer or same version.
| // (i.e. the snapshot was taken in a node with version > than this node version) | ||
| // reading the segment commit information could likely fail and we won't be able | ||
| // to recover from a snapshot. This should be a rare edge-case since for most cases | ||
| // the allocation deciders won't allow allocating replicas in nodes with older versions. |
There was a problem hiding this comment.
I am not sure I follow this. The allocation deciders will not allow allocating replicas to an older node. But I think this is about the primary, do we risk having the primary on an older version than the snapshot (and yes we do and I agree that it is rare). So perhaps it should just read "allocation primaries" and not "allocating replicas" in this line?
| // Flush to ensure that index_commit_seq_nos(replica) == index_commit_seq_nos(primary), | ||
| // since the primary flushes the index before taking the snapshot. | ||
| flush(indexName); |
There was a problem hiding this comment.
I am not sure I follow this comment since at this time there is no replica?
| ); | ||
| final Settings.Builder indexSettings = Settings.builder() | ||
| .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
| .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) |
There was a problem hiding this comment.
Did you intend to add a replica in this test for the seqNoRecovery case? Running the test failed for me when looking up the replica shard further down.
| public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0; | ||
| public static final Version SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_16_0; | ||
| // TODO: update after backport | ||
| public static final Version SEQ_NO_SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.CURRENT; |
There was a problem hiding this comment.
nit: I would normally put in V_8_0_0 until backport here, but it is not important.
|
|
||
| // commit the new history id | ||
| userData.put(Engine.HISTORY_UUID_KEY, historyUUID); | ||
| userData.put(Engine.ES_VERSION, Version.CURRENT.toString()); |
There was a problem hiding this comment.
I wonder if we should check that the version does not go backwards explicitly here. I wonder if someone might be able to apply the command from an earlier version in edge cases.
| List<IndexCommit> commits = DirectoryReader.listCommits(shard.store().directory()); | ||
| IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint); | ||
| assertThat(safeCommit, is(notNullValue())); |
There was a problem hiding this comment.
Can we use IndexShard.acquireSafeCommit() instead? Need to release it then, but seems more straighforward.
| } finally { | ||
| updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), null); | ||
| } | ||
| private Store.MetadataSnapshot getMetadataSnapshot(String nodeName, String indexName, int globalCheckpoint) throws IOException { |
There was a problem hiding this comment.
It is not entirely clear to me why we need the global checkpoint passed in here rather than just take it from the shard.
|
@elasticmachine update branch |
|
@elasticmachine update branch |
This commit adds support for peer recoveries using snapshots after a primary failover if the snapshot shares the same logical contents but the physical files are different. It uses the seq no information stored in the snapshot to compare against the current shard source node seq nos and decide whether or not it can use the snapshot to recover the shard. Since the underlying index files are different to the source index files, error handling is different than when the files are shared. In this case, if there's an error while snapshots files are recovered, we have to cancel the on-going downloads, wait until all in-flight operations complete, remove the recovered files and start from scratch using a fallback recovery plan that uses the files from the source node. Relates elastic#73496 Backport of elastic#77420
|
Thanks for the review Henning! |
…ailovers (#79137) This commit adds support for peer recoveries using snapshots after a primary failover if the snapshot shares the same logical contents but the physical files are different. It uses the seq no information stored in the snapshot to compare against the current shard source node seq nos and decide whether or not it can use the snapshot to recover the shard. Since the underlying index files are different to the source index files, error handling is different than when the files are shared. In this case, if there's an error while snapshots files are recovered, we have to cancel the on-going downloads, wait until all in-flight operations complete, remove the recovered files and start from scratch using a fallback recovery plan that uses the files from the source node. Relates #73496 Backport of #77420
…otIsUsedEvenIfFilesAreDifferent` (#114821) Don't test any 7.x snapshots, keep using any 8,x compatible snapshot and Lucene version. Originally added in 8.0 (#77420) for testing peer recoveries using snapshots. Co-authored-by: Yang Wang <ywangd@gmail.com> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
…otIsUsedEvenIfFilesAreDifferent` (elastic#114821) Don't test any 7.x snapshots, keep using any 8,x compatible snapshot and Lucene version. Originally added in 8.0 (elastic#77420) for testing peer recoveries using snapshots. Co-authored-by: Yang Wang <ywangd@gmail.com> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
…otIsUsedEvenIfFilesAreDifferent` (elastic#114821) Don't test any 7.x snapshots, keep using any 8,x compatible snapshot and Lucene version. Originally added in 8.0 (elastic#77420) for testing peer recoveries using snapshots. Co-authored-by: Yang Wang <ywangd@gmail.com> Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit adds support for peer recoveries using snapshots after
a primary failover if the snapshot shares the same logical contents
but the physical files are different. It uses the seq no information
stored in the snapshot to compare against the current shard source
node seq nos and decide whether or not it can use the snapshot to
recover the shard. Since the underlying index files are different
to the source index files, error handling is different than when
the files are shared. In this case, if there's an error while
snapshots files are recovered, we have to cancel the on-going
downloads, wait until all in-flight operations complete, remove
the recovered files and start from scratch using a fallback
recovery plan that uses the files from the source node.
Relates #73496