Skip to content

Commit fde53c9

Browse files
committed
Support only 1 repository
1 parent 2c58137 commit fde53c9

4 files changed

Lines changed: 132 additions & 198 deletions

File tree

server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsServiceIT.java

Lines changed: 35 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.elasticsearch.repositories.RepositoriesService;
3131
import org.elasticsearch.repositories.Repository;
3232
import org.elasticsearch.repositories.RepositoryData;
33-
import org.elasticsearch.repositories.ShardSnapshotInfo;
3433
import org.elasticsearch.repositories.fs.FsRepository;
3534
import org.elasticsearch.snapshots.SnapshotException;
3635
import org.elasticsearch.snapshots.SnapshotId;
@@ -39,17 +38,15 @@
3938

4039
import java.io.FileNotFoundException;
4140
import java.io.IOException;
42-
import java.util.ArrayList;
41+
import java.nio.file.Path;
4342
import java.util.Collection;
4443
import java.util.Collections;
45-
import java.util.List;
4644
import java.util.Map;
45+
import java.util.Optional;
4746

4847
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
49-
import static org.hamcrest.Matchers.empty;
5048
import static org.hamcrest.Matchers.equalTo;
5149
import static org.hamcrest.Matchers.greaterThan;
52-
import static org.hamcrest.Matchers.is;
5350

5451
public class ShardSnapshotsServiceIT extends ESIntegTestCase {
5552
@Override
@@ -128,15 +125,6 @@ public BlobStoreIndexShardSnapshots getBlobStoreIndexShardSnapshots(IndexId inde
128125
}
129126
}
130127

131-
public void testReturnsEmptyListWhenThereAreNotAvailableRepositories() throws Exception {
132-
String indexName = "test";
133-
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
134-
ShardId shardId = getShardIdForIndex(indexName);
135-
136-
List<ShardSnapshot> shardSnapshotData = getShardSnapshotShard(shardId);
137-
assertThat(shardSnapshotData, is(empty()));
138-
}
139-
140128
public void testFetchFromSingleRepository() throws Exception {
141129
String indexName = "test";
142130
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
@@ -147,30 +135,20 @@ public void testFetchFromSingleRepository() throws Exception {
147135
}
148136

149137
String snapshotName = "snap";
150-
final int numberOfRepos = randomIntBetween(1, 4);
151-
List<String> repositories = new ArrayList<>(numberOfRepos);
152-
for (int i = 0; i < numberOfRepos; i++) {
153-
String repositoryName = "repo-" + i;
154-
createRepository(repositoryName, "fs");
155-
repositories.add(repositoryName);
156-
createSnapshot(repositoryName, snapshotName, indexName);
157-
}
158-
159-
String repositoryToFetch = randomFrom(repositories);
160-
ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService();
138+
String repositoryName = "repo";
139+
createRepository(repositoryName, "fs", randomRepoPath());
140+
createSnapshot(repositoryName, snapshotName, indexName);
161141

162-
PlainActionFuture<List<ShardSnapshot>> future = PlainActionFuture.newFuture();
163-
shardSnapshotsService.fetchAvailableSnapshots(repositoryToFetch, shardId, future);
164-
List<ShardSnapshot> shardSnapshots = future.get();
142+
Optional<ShardSnapshot> shardSnapshotOpt = getShardSnapshotShard(repositoryName, shardId).get();
165143

166-
assertThat(shardSnapshots.size(), equalTo(1));
167-
ShardSnapshot shardSnapshot = shardSnapshots.get(0);
168-
assertThat(shardSnapshot.getRepository(), equalTo(repositoryToFetch));
144+
assertThat(shardSnapshotOpt.isPresent(), equalTo(true));
145+
ShardSnapshot shardSnapshot = shardSnapshotOpt.get();
146+
assertThat(shardSnapshot.getRepository(), equalTo(repositoryName));
169147
assertThat(shardSnapshot.getShardSnapshotInfo().getShardId(), equalTo(shardId));
170148
assertThat(shardSnapshot.getMetadataSnapshot().size(), greaterThan(0));
171149
}
172150

173-
public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Exception {
151+
public void testFailingRepositoriesAtAnyStageReturnAnError() {
174152
final String indexName = "test";
175153
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build());
176154
ShardId shardId = getShardIdForIndex(indexName);
@@ -180,46 +158,22 @@ public void testFailingReposAreTreatedAsNonExistingShardSnapshots() throws Excep
180158
}
181159

182160
String snapshotName = "snap";
161+
String repositoryName = "failing-repo";
162+
Path repoPath = randomRepoPath();
163+
createRepository(repositoryName, FailingRepoPlugin.TYPE, repoPath);
164+
createSnapshot(repositoryName, snapshotName, indexName);
165+
166+
// Update repository settings to fail fetching the repository information at any stage
167+
String repoFailureType = randomFrom(FailingRepo.FAIL_GET_REPOSITORY_DATA_SETTING_KEY,
168+
FailingRepo.FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY,
169+
FailingRepo.FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY
170+
);
171+
assertAcked(client().admin().cluster().preparePutRepository(repositoryName)
172+
.setType(FailingRepoPlugin.TYPE)
173+
.setVerify(false)
174+
.setSettings(Settings.builder().put("location", repoPath).put(repoFailureType, true).build()));
183175

184-
int numberOfFailingRepos = randomIntBetween(1, 3);
185-
List<String> failingRepos = new ArrayList<>();
186-
for (int i = 0; i < numberOfFailingRepos; i++) {
187-
String repositoryName = "failing-repo-" + i;
188-
createRepository(repositoryName, FailingRepoPlugin.TYPE);
189-
createSnapshot(repositoryName, snapshotName, indexName);
190-
failingRepos.add(repositoryName);
191-
}
192-
193-
int numberOfWorkingRepositories = randomIntBetween(0, 4);
194-
List<String> workingRepos = new ArrayList<>();
195-
for (int i = 0; i < numberOfWorkingRepositories; i++) {
196-
String repositoryName = "repo-" + i;
197-
createRepository(repositoryName, "fs");
198-
workingRepos.add(repositoryName);
199-
createSnapshot(repositoryName, snapshotName, indexName);
200-
}
201-
202-
for (String failingRepo : failingRepos) {
203-
// Update repository settings to fail fetching the repository information at any stage
204-
String repoFailureType =
205-
randomFrom(FailingRepo.FAIL_GET_REPOSITORY_DATA_SETTING_KEY,
206-
FailingRepo.FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY,
207-
FailingRepo.FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY
208-
);
209-
createRepository(failingRepo, FailingRepoPlugin.TYPE, Settings.builder().put(repoFailureType, true).build());
210-
}
211-
212-
List<ShardSnapshot> shardSnapshotDataForShard = getShardSnapshotShard(shardId);
213-
214-
assertThat(shardSnapshotDataForShard.size(), is(equalTo(numberOfWorkingRepositories)));
215-
for (ShardSnapshot shardSnapshotData : shardSnapshotDataForShard) {
216-
assertThat(workingRepos.contains(shardSnapshotData.getRepository()), is(equalTo(true)));
217-
assertThat(shardSnapshotData.getMetadataSnapshot().size(), is(greaterThan(0)));
218-
219-
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotData.getShardSnapshotInfo();
220-
assertThat(shardSnapshotInfo.getShardId(), equalTo(shardId));
221-
assertThat(shardSnapshotInfo.getSnapshot().getSnapshotId().getName(), equalTo(snapshotName));
222-
}
176+
expectThrows(Exception.class, () -> getShardSnapshotShard(repositoryName, shardId).actionGet());
223177
}
224178

225179
public void testFetchFromNonExistingRepositoryReturnsAnError() {
@@ -230,27 +184,25 @@ public void testFetchFromNonExistingRepositoryReturnsAnError() {
230184
String repositoryToFetch = "unknown";
231185
ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService();
232186

233-
PlainActionFuture<List<ShardSnapshot>> future = PlainActionFuture.newFuture();
234-
shardSnapshotsService.fetchAvailableSnapshots(repositoryToFetch, shardId, future);
187+
PlainActionFuture<Optional<ShardSnapshot>> future = PlainActionFuture.newFuture();
188+
shardSnapshotsService.fetchLatestSnapshot(repositoryToFetch, shardId, future);
235189
expectThrows(Exception.class, future::actionGet);
236190
}
237191

238192
public void testInputValidations() {
239193
ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService();
240194
expectThrows(IllegalArgumentException.class, () ->
241-
shardSnapshotsService.fetchAvailableSnapshots(randomFrom("", null), null, null));
195+
shardSnapshotsService.fetchLatestSnapshot(randomFrom("", null), null, null));
242196
expectThrows(IllegalArgumentException.class, () ->
243-
shardSnapshotsService.fetchAvailableSnapshots("repo", null, null));
244-
expectThrows(IllegalArgumentException.class, () ->
245-
shardSnapshotsService.fetchAvailableSnapshotsInAllRepositories(null, null));
197+
shardSnapshotsService.fetchLatestSnapshot("repo", null, null));
246198
}
247199

248-
private List<ShardSnapshot> getShardSnapshotShard(ShardId shardId) throws Exception {
200+
private PlainActionFuture<Optional<ShardSnapshot>> getShardSnapshotShard(String repository, ShardId shardId) {
249201
ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService();
250202

251-
PlainActionFuture<List<ShardSnapshot>> future = PlainActionFuture.newFuture();
252-
shardSnapshotsService.fetchAvailableSnapshotsInAllRepositories(shardId, future);
253-
return future.get();
203+
PlainActionFuture<Optional<ShardSnapshot>> future = PlainActionFuture.newFuture();
204+
shardSnapshotsService.fetchLatestSnapshot(repository, shardId, future);
205+
return future;
254206
}
255207

256208
private ShardSnapshotsService getShardSnapshotsService() {
@@ -264,15 +216,11 @@ private ShardId getShardIdForIndex(String indexName) {
264216
return state.routingTable().index(indexName).shard(0).shardId();
265217
}
266218

267-
private void createRepository(String repositoryName, String type) {
268-
createRepository(repositoryName, type, Settings.EMPTY);
269-
}
270-
271-
private void createRepository(String repositoryName, String type, Settings settings) {
219+
private void createRepository(String repositoryName, String type, Path location) {
272220
assertAcked(client().admin().cluster().preparePutRepository(repositoryName)
273221
.setType(type)
274222
.setVerify(false)
275-
.setSettings(Settings.builder().put(settings).put("location", randomRepoPath())));
223+
.setSettings(Settings.builder().put("location", location)));
276224
}
277225

278226
private void createSnapshot(String repoName, String snapshotName, String index) {

server/src/main/java/org/elasticsearch/indices/recovery/plan/ShardSnapshotsService.java

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@
2727
import org.elasticsearch.snapshots.Snapshot;
2828
import org.elasticsearch.threadpool.ThreadPool;
2929

30-
import java.util.ArrayList;
31-
import java.util.Collection;
3230
import java.util.Collections;
3331
import java.util.List;
32+
import java.util.Optional;
3433

3534
public class ShardSnapshotsService {
3635
private final Logger logger = LogManager.getLogger(ShardSnapshotsService.class);
@@ -45,49 +44,39 @@ public ShardSnapshotsService(Client client, RepositoriesService repositoriesServ
4544
this.threadPool = threadPool;
4645
}
4746

48-
public void fetchAvailableSnapshotsInAllRepositories(ShardId shardId, ActionListener<List<ShardSnapshot>> listener) {
49-
if (shardId == null) {
50-
throw new IllegalArgumentException("SharId was null but a value was expected");
51-
}
52-
final GetShardSnapshotRequest request = GetShardSnapshotRequest.latestSnapshotInAllRepositories(shardId);
53-
sendRequest(request, listener);
54-
}
55-
56-
public void fetchAvailableSnapshots(String repository, ShardId shardId, ActionListener<List<ShardSnapshot>> listener) {
47+
public void fetchLatestSnapshot(String repository, ShardId shardId, ActionListener<Optional<ShardSnapshot>> listener) {
5748
if (Strings.isNullOrEmpty(repository)) {
5849
throw new IllegalArgumentException("A repository should be specified");
5950
}
6051
if (shardId == null) {
6152
throw new IllegalArgumentException("SharId was null but a value was expected");
6253
}
54+
6355
GetShardSnapshotRequest request =
6456
GetShardSnapshotRequest.latestSnapshotInRepositories(shardId, Collections.singletonList(repository));
65-
sendRequest(request, listener);
66-
}
6757

68-
private void sendRequest(GetShardSnapshotRequest request, ActionListener<List<ShardSnapshot>> listener) {
6958
client.execute(GetShardSnapshotAction.INSTANCE,
7059
request,
71-
new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, listener.map(this::fetchSnapshotFiles), false)
60+
new ThreadedActionListener<>(logger,
61+
threadPool,
62+
ThreadPool.Names.GENERIC,
63+
listener.map(response -> fetchSnapshotFiles(repository, response)),
64+
false
65+
)
7266
);
7367
}
7468

75-
private List<ShardSnapshot> fetchSnapshotFiles(GetShardSnapshotResponse shardSnapshotResponse) {
69+
private Optional<ShardSnapshot> fetchSnapshotFiles(String repository, GetShardSnapshotResponse shardSnapshotResponse) {
7670
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
7771

78-
if (shardSnapshotResponse.getRepositoryShardSnapshots().isEmpty()) {
79-
return Collections.emptyList();
72+
Optional<ShardSnapshotInfo> shardSnapshotInfoOpt = shardSnapshotResponse.getIndexShardSnapshotInfoForRepository(repository);
73+
if (shardSnapshotInfoOpt.isEmpty()) {
74+
return Optional.empty();
8075
}
8176

82-
Collection<ShardSnapshotInfo> shardSnapshots = shardSnapshotResponse.getRepositoryShardSnapshots().values();
83-
List<ShardSnapshot> shardSnapshotData = new ArrayList<>(shardSnapshots.size());
84-
for (ShardSnapshotInfo shardSnapshot : shardSnapshots) {
85-
final List<BlobStoreIndexShardSnapshot.FileInfo> snapshotFiles = getSnapshotFileList(shardSnapshot);
86-
if (snapshotFiles.isEmpty() == false) {
87-
shardSnapshotData.add(new ShardSnapshot(shardSnapshot, snapshotFiles));
88-
}
89-
}
90-
return shardSnapshotData;
77+
ShardSnapshotInfo shardSnapshotInfo = shardSnapshotInfoOpt.get();
78+
List<BlobStoreIndexShardSnapshot.FileInfo> snapshotFiles = getSnapshotFileList(shardSnapshotInfo);
79+
return Optional.of(new ShardSnapshot(shardSnapshotInfo, snapshotFiles));
9180
}
9281

9382
private List<BlobStoreIndexShardSnapshot.FileInfo> getSnapshotFileList(ShardSnapshotInfo shardSnapshotInfo) {
@@ -102,7 +91,7 @@ private List<BlobStoreIndexShardSnapshot.FileInfo> getSnapshotFileList(ShardSnap
10291
return blobStoreIndexShardSnapshot.indexFiles();
10392
} catch (Exception e) {
10493
logger.warn(new ParameterizedMessage("Unable to fetch shard snapshot files for {}", shardSnapshotInfo), e);
105-
return Collections.emptyList();
94+
throw e;
10695
}
10796
}
10897
}

0 commit comments

Comments
 (0)