Skip to content

Commit 9f07fde

Browse files
committed
Add version checks
1 parent c619f75 commit 9f07fde

7 files changed

Lines changed: 195 additions & 47 deletions

File tree

server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsServiceIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,11 @@
5050
import java.util.Optional;
5151

5252
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
53+
import static org.hamcrest.Matchers.anEmptyMap;
5354
import static org.hamcrest.Matchers.equalTo;
5455
import static org.hamcrest.Matchers.greaterThan;
5556
import static org.hamcrest.Matchers.is;
57+
import static org.hamcrest.Matchers.not;
5658

5759
public class ShardSnapshotsServiceIT extends ESIntegTestCase {
5860
@Override
@@ -184,6 +186,7 @@ public void testOnlyFetchesSnapshotFromEnabledRepositories() throws Exception {
184186
assertThat(nonEnabledRepos.contains(shardSnapshotInfo.getRepository()), is(equalTo(false)));
185187

186188
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));
189+
assertThat(shardSnapshotData.getLuceneCommitUserData(), is(not(anEmptyMap())));
187190

188191
assertThat(shardSnapshotInfo.getShardId(), is(equalTo(shardId)));
189192
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), is(equalTo(snapshotName)));

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public abstract class Engine implements Closeable {
9595
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
9696
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
9797
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
98+
public static final String ES_VERSION = "es_version";
9899
public static final String SEARCH_SOURCE = "search"; // TODO: Make source of search enum?
99100
public static final String CAN_MATCH_SEARCH_SOURCE = "can_match";
100101
protected static final String DOC_STATS_SOURCE = "doc_stats";

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.lucene.util.InfoStream;
4444
import org.elasticsearch.Assertions;
4545
import org.elasticsearch.ExceptionsHelper;
46+
import org.elasticsearch.Version;
4647
import org.elasticsearch.action.index.IndexRequest;
4748
import org.elasticsearch.common.lucene.LoggerInfoStream;
4849
import org.elasticsearch.common.lucene.Lucene;
@@ -2375,6 +2376,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
23752376
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
23762377
}
23772378
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
2379+
commitData.put(ES_VERSION, Version.CURRENT.toString());
23782380
logger.trace("committing writer with commit data [{}]", commitData);
23792381
return commitData.entrySet().iterator();
23802382
});

server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshot.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.indices.recovery.plan;
1010

11+
import org.elasticsearch.Version;
1112
import org.elasticsearch.core.Nullable;
1213
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
1314
import org.elasticsearch.index.store.Store;
@@ -21,17 +22,23 @@
2122
import java.util.function.Function;
2223
import java.util.stream.Collectors;
2324

25+
import static org.elasticsearch.index.engine.Engine.ES_VERSION;
26+
2427
public class ShardSnapshot {
2528
private final ShardSnapshotInfo shardSnapshotInfo;
2629
// Segment file name -> file info
2730
private final Map<String, BlobStoreIndexShardSnapshot.FileInfo> snapshotFiles;
2831
private final Store.MetadataSnapshot metadataSnapshot;
32+
private final Map<String, String> luceneCommitUserData;
2933

30-
ShardSnapshot(ShardSnapshotInfo shardSnapshotInfo, List<BlobStoreIndexShardSnapshot.FileInfo> snapshotFiles) {
34+
ShardSnapshot(ShardSnapshotInfo shardSnapshotInfo,
35+
List<BlobStoreIndexShardSnapshot.FileInfo> snapshotFiles,
36+
Map<String, String> luceneCommitUserData) {
3137
this.shardSnapshotInfo = shardSnapshotInfo;
3238
this.snapshotFiles = snapshotFiles.stream()
3339
.collect(Collectors.toMap(snapshotFile -> snapshotFile.metadata().name(), Function.identity()));
3440
this.metadataSnapshot = convertToMetadataSnapshot(snapshotFiles);
41+
this.luceneCommitUserData = luceneCommitUserData;
3542
}
3643

3744
public String getShardStateIdentifier() {
@@ -42,6 +49,11 @@ public boolean isLogicallyEquivalent(@Nullable String shardStateIdentifier) {
4249
return shardStateIdentifier != null && shardStateIdentifier.equals(shardSnapshotInfo.getShardStateIdentifier());
4350
}
4451

52+
public boolean hasDifferentPhysicalFiles(Store.MetadataSnapshot sourceSnapshot) {
53+
Store.RecoveryDiff recoveryDiff = metadataSnapshot.recoveryDiff(sourceSnapshot);
54+
return recoveryDiff.different.isEmpty() == false || recoveryDiff.missing.isEmpty() == false;
55+
}
56+
4557
public String getRepository() {
4658
return shardSnapshotInfo.getRepository();
4759
}
@@ -62,12 +74,26 @@ public ShardSnapshotInfo getShardSnapshotInfo() {
6274
return shardSnapshotInfo;
6375
}
6476

65-
public List<BlobStoreIndexShardSnapshot.FileInfo> getSnapshotFiles(List<StoreFileMetadata> segmentFiles) {
77+
public Map<String, String> getLuceneCommitUserData() {
78+
return luceneCommitUserData;
79+
}
80+
81+
@Nullable
82+
public Version getVersion() {
83+
String version = luceneCommitUserData.get(ES_VERSION);
84+
return version == null ? null : Version.fromString(version);
85+
}
86+
87+
public List<BlobStoreIndexShardSnapshot.FileInfo> getSnapshotFilesMatching(List<StoreFileMetadata> segmentFiles) {
6688
return segmentFiles.stream()
6789
.map(storeFileMetadata -> snapshotFiles.get(storeFileMetadata.name()))
6890
.collect(Collectors.toList());
6991
}
7092

93+
public List<BlobStoreIndexShardSnapshot.FileInfo> getSnapshotFiles() {
94+
return List.copyOf(snapshotFiles.values());
95+
}
96+
7197
static Store.MetadataSnapshot convertToMetadataSnapshot(List<BlobStoreIndexShardSnapshot.FileInfo> snapshotFiles) {
7298
return new Store.MetadataSnapshot(
7399
snapshotFiles.stream()

server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.apache.lucene.index.SegmentInfos;
15+
import org.apache.lucene.store.BaseDirectory;
16+
import org.apache.lucene.store.IOContext;
17+
import org.apache.lucene.store.IndexInput;
18+
import org.apache.lucene.store.IndexOutput;
19+
import org.apache.lucene.store.SingleInstanceLockFactory;
20+
import org.apache.lucene.util.BytesRef;
1421
import org.elasticsearch.action.ActionListener;
1522
import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotAction;
1623
import org.elasticsearch.action.admin.cluster.snapshots.get.shard.GetShardSnapshotRequest;
@@ -22,17 +29,25 @@
2229
import org.elasticsearch.cluster.service.ClusterService;
2330
import org.elasticsearch.common.blobstore.BlobContainer;
2431
import org.elasticsearch.common.inject.Inject;
32+
import org.elasticsearch.common.lucene.Lucene;
33+
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
2534
import org.elasticsearch.index.shard.ShardId;
2635
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
36+
import org.elasticsearch.index.store.StoreFileMetadata;
2737
import org.elasticsearch.repositories.RepositoriesService;
2838
import org.elasticsearch.repositories.Repository;
2939
import org.elasticsearch.repositories.ShardSnapshotInfo;
3040
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
3141
import org.elasticsearch.snapshots.Snapshot;
3242
import org.elasticsearch.threadpool.ThreadPool;
3343

44+
import java.io.IOException;
45+
import java.util.Collection;
3446
import java.util.List;
47+
import java.util.Map;
3548
import java.util.Optional;
49+
import java.util.Set;
50+
import java.util.function.Function;
3651
import java.util.stream.Collectors;
3752

3853
import static org.elasticsearch.indices.recovery.RecoverySettings.SNAPSHOT_RECOVERIES_SUPPORTED_VERSION;
@@ -108,7 +123,16 @@ private Optional<ShardSnapshot> fetchSnapshotFiles(GetShardSnapshotResponse shar
108123
BlobStoreIndexShardSnapshot blobStoreIndexShardSnapshot =
109124
blobStoreRepository.loadShardSnapshot(blobContainer, snapshot.getSnapshotId());
110125

111-
return Optional.of(new ShardSnapshot(latestShardSnapshot, blobStoreIndexShardSnapshot.indexFiles()));
126+
Map<String, StoreFileMetadata> snapshotFiles = blobStoreIndexShardSnapshot.indexFiles()
127+
.stream()
128+
.map(BlobStoreIndexShardSnapshot.FileInfo::metadata)
129+
.collect(Collectors.toMap(StoreFileMetadata::name, Function.identity()));
130+
131+
InMemoryDirectory directory = new InMemoryDirectory(snapshotFiles);
132+
SegmentInfos segmentCommitInfos = Lucene.readSegmentInfos(directory);
133+
Map<String, String> userData = segmentCommitInfos.userData;
134+
135+
return Optional.of(new ShardSnapshot(latestShardSnapshot, blobStoreIndexShardSnapshot.indexFiles(), userData));
112136
} catch (Exception e) {
113137
logger.warn(new ParameterizedMessage("Unable to fetch shard snapshot files for {}", latestShardSnapshot), e);
114138
return Optional.empty();
@@ -118,4 +142,84 @@ private Optional<ShardSnapshot> fetchSnapshotFiles(GetShardSnapshotResponse shar
118142
protected boolean masterSupportsFetchingLatestSnapshots() {
119143
return clusterService.state().nodes().getMinNodeVersion().onOrAfter(SNAPSHOT_RECOVERIES_SUPPORTED_VERSION);
120144
}
145+
146+
static final class InMemoryDirectory extends BaseDirectory {
147+
private final Map<String, StoreFileMetadata> files;
148+
149+
InMemoryDirectory(Map<String, StoreFileMetadata> files) {
150+
super(new SingleInstanceLockFactory());
151+
this.files = files;
152+
}
153+
154+
@Override
155+
public String[] listAll() {
156+
return files.keySet().toArray(new String[0]);
157+
}
158+
159+
@Override
160+
public IndexInput openInput(String name, IOContext context) throws IOException {
161+
StoreFileMetadata storeFileMetadata = getStoreFileMetadata(name);
162+
if (storeFileMetadata.hashEqualsContents() == false) {
163+
throw new IOException("Unable to open " + name);
164+
}
165+
166+
final BytesRef data = storeFileMetadata.hash();
167+
return new ByteArrayIndexInput(name, data.bytes, data.offset, data.length);
168+
}
169+
170+
@Override
171+
public void deleteFile(String name) {
172+
throw new UnsupportedOperationException("this directory is read-only");
173+
}
174+
175+
@Override
176+
public long fileLength(String name) throws IOException {
177+
final StoreFileMetadata storeFileMetadata = getStoreFileMetadata(name);
178+
return storeFileMetadata.length();
179+
}
180+
181+
182+
@Override
183+
public IndexOutput createOutput(String name, IOContext context) {
184+
throw new UnsupportedOperationException("this directory is read-only");
185+
}
186+
187+
@Override
188+
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
189+
throw new UnsupportedOperationException("this directory is read-only");
190+
}
191+
192+
@Override
193+
public void sync(Collection<String> names) {
194+
throw new UnsupportedOperationException("this directory is read-only");
195+
}
196+
197+
@Override
198+
public void syncMetaData() {
199+
throw new UnsupportedOperationException("this directory is read-only");
200+
}
201+
202+
@Override
203+
public void rename(String source, String dest) {
204+
throw new UnsupportedOperationException("this directory is read-only");
205+
}
206+
207+
@Override
208+
public void close() {
209+
// no-op
210+
}
211+
212+
@Override
213+
public Set<String> getPendingDeletions() {
214+
throw new UnsupportedOperationException("this directory is read-only");
215+
}
216+
217+
private StoreFileMetadata getStoreFileMetadata(String name) throws IOException {
218+
final StoreFileMetadata storeFileMetadata = files.get(name);
219+
if (storeFileMetadata == null) {
220+
throw new IOException("Unable to find " + name);
221+
}
222+
return storeFileMetadata;
223+
}
224+
}
121225
}

server/src/main/java/org/elasticsearch/indices/recovery/plan/SnapshotsRecoveryPlannerService.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -80,31 +80,30 @@ private ShardRecoveryPlan computeRecoveryPlanWithSnapshots(@Nullable String shar
8080

8181
ShardSnapshot latestSnapshot = latestSnapshotOpt.get();
8282

83-
if (latestSnapshot.isLogicallyEquivalent(shardStateIdentifier)) {
84-
Store.RecoveryDiff recoveryDiff = latestSnapshot.getMetadataSnapshot().recoveryDiff(sourceMetadata);
85-
86-
// Primary failed over after the snapshot was taken
87-
if (recoveryDiff.different.isEmpty() == false || recoveryDiff.missing.isEmpty() == false) {
88-
// Use the current primary as a fallback if the download fails half-way
89-
ShardRecoveryPlan fallbackPlan =
90-
getRecoveryPlanUsingSourceNode(sourceMetadata, sourceTargetDiff, filesMissingInTarget, startingSeqNo, translogOps);
91-
92-
Store.RecoveryDiff snapshotDiff = latestSnapshot.getMetadataSnapshot().recoveryDiff(targetMetadata);
93-
ShardRecoveryPlan.SnapshotFilesToRecover snapshotFilesToRecover = new ShardRecoveryPlan.SnapshotFilesToRecover(
94-
latestSnapshot.getIndexId(),
95-
latestSnapshot.getRepository(),
96-
latestSnapshot.getSnapshotFiles(concatLists(snapshotDiff.different, snapshotDiff.missing))
97-
);
98-
99-
return new ShardRecoveryPlan(snapshotFilesToRecover,
100-
Collections.emptyList(),
101-
snapshotDiff.identical,
102-
startingSeqNo,
103-
translogOps,
104-
latestSnapshot.getMetadataSnapshot(),
105-
fallbackPlan
106-
);
107-
}
83+
Version snapshotVersion = latestSnapshot.getVersion();
84+
// Primary failed over after the snapshot was taken
85+
if (latestSnapshot.isLogicallyEquivalent(shardStateIdentifier) &&
86+
latestSnapshot.hasDifferentPhysicalFiles(sourceMetadata) &&
87+
snapshotVersion != null && snapshotVersion.onOrBefore(Version.CURRENT) &&
88+
sourceTargetDiff.identical.isEmpty()) {
89+
// Use the current primary as a fallback if the download fails half-way
90+
ShardRecoveryPlan fallbackPlan =
91+
getRecoveryPlanUsingSourceNode(sourceMetadata, sourceTargetDiff, filesMissingInTarget, startingSeqNo, translogOps);
92+
93+
ShardRecoveryPlan.SnapshotFilesToRecover snapshotFilesToRecover = new ShardRecoveryPlan.SnapshotFilesToRecover(
94+
latestSnapshot.getIndexId(),
95+
latestSnapshot.getRepository(),
96+
latestSnapshot.getSnapshotFiles()
97+
);
98+
99+
return new ShardRecoveryPlan(snapshotFilesToRecover,
100+
Collections.emptyList(),
101+
Collections.emptyList(),
102+
startingSeqNo,
103+
translogOps,
104+
latestSnapshot.getMetadataSnapshot(),
105+
fallbackPlan
106+
);
108107
}
109108

110109
Store.MetadataSnapshot filesToRecoverFromSourceSnapshot = toMetadataSnapshot(filesMissingInTarget);
@@ -115,7 +114,7 @@ private ShardRecoveryPlan computeRecoveryPlanWithSnapshots(@Nullable String shar
115114
} else {
116115
snapshotFilesToRecover = new ShardRecoveryPlan.SnapshotFilesToRecover(latestSnapshot.getIndexId(),
117116
latestSnapshot.getRepository(),
118-
latestSnapshot.getSnapshotFiles(snapshotDiff.identical));
117+
latestSnapshot.getSnapshotFilesMatching(snapshotDiff.identical));
119118
}
120119

121120
return new ShardRecoveryPlan(snapshotFilesToRecover,

0 commit comments

Comments
 (0)