Skip to content

Commit 14e4b6e

Browse files
committed
Switch to a callback-based approach
1 parent 4f22069 commit 14e4b6e

3 files changed

Lines changed: 54 additions & 37 deletions

File tree

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/slm/history/SnapshotHistoryItem.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ public static SnapshotHistoryItem deletionSuccessRecord(long timestamp, String s
116116
return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, null);
117117
}
118118

119+
public static SnapshotHistoryItem deletionPossibleSuccessRecord(long timestamp, String snapshotName, String policyId, String repository, String details) {
120+
return new SnapshotHistoryItem(timestamp, policyId, repository, snapshotName, DELETE_OPERATION, true, null, details);
121+
}
122+
119123
public static SnapshotHistoryItem deletionFailureRecord(long timestamp, String snapshotName, String policyId, String repository,
120124
Exception exception) throws IOException {
121125
String exceptionString = exceptionToString(exception);

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import java.util.Set;
4242
import java.util.concurrent.CountDownLatch;
4343
import java.util.concurrent.atomic.AtomicBoolean;
44-
import java.util.concurrent.atomic.AtomicReference;
44+
import java.util.concurrent.atomic.AtomicInteger;
4545
import java.util.function.Consumer;
4646
import java.util.function.LongSupplier;
4747
import java.util.stream.Collectors;
@@ -256,21 +256,26 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,
256256

257257
logger.info("starting snapshot retention deletion for [{}] snapshots", count);
258258
long startTime = nowNanoSupplier.getAsLong();
259-
int deleted = 0;
260-
int failed = 0;
259+
final AtomicInteger deleted = new AtomicInteger(0);
260+
final AtomicInteger failed = new AtomicInteger(0);
261261
for (Map.Entry<String, List<SnapshotInfo>> entry : snapshotsToDelete.entrySet()) {
262262
String repo = entry.getKey();
263263
List<SnapshotInfo> snapshots = entry.getValue();
264264
for (SnapshotInfo info : snapshots) {
265-
Optional<SnapshotHistoryItem> result = deleteSnapshot(getPolicyId(info), repo, info, slmStats);
266-
if (result.isPresent()) {
267-
if (result.get().isSuccess()) {
268-
deleted++;
265+
deleteSnapshot(getPolicyId(info), repo, info, slmStats, historyItem -> {
266+
// This would be nicer if we could use ifPresentOrElse
267+
historyItem.ifPresent(item -> {
268+
if (item.isSuccess()) {
269+
deleted.incrementAndGet();
270+
} else {
271+
failed.incrementAndGet();
272+
}
273+
historyStore.putAsync(item);
274+
});
275+
if (historyItem.isEmpty()) {
276+
failed.incrementAndGet();
269277
}
270-
historyStore.putAsync(result.get());
271-
} else {
272-
failed++;
273-
}
278+
});
274279
// Check whether we have exceeded the maximum time allowed to spend deleting
275280
// snapshots, if we have, short-circuit the rest of the deletions
276281
TimeValue elapsedDeletionTime = TimeValue.timeValueNanos(nowNanoSupplier.getAsLong() - startTime);
@@ -292,24 +297,29 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,
292297

293298
/**
294299
* Delete the given snapshot from the repository in blocking manner
295-
* @param repo The repository the snapshot is in
296-
* @param snapshot The snapshot metadata
297-
* @return If present, a SnapshotHistoryItem containing the results of the deletion. Empty if no response or interrupted.
300+
*
301+
* @param repo The repository the snapshot is in
302+
* @param snapshot The snapshot metadata
303+
* @param onCompletion A callback taking info on the history of the snapshot. If present, a SnapshotHistoryItem containing the results
304+
* of the deletion. Empty if interrupted or failed to serialize exception.
298305
*/
299-
Optional<SnapshotHistoryItem> deleteSnapshot(String slmPolicy, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats) {
306+
void deleteSnapshot(String slmPolicy, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats,
307+
Consumer<Optional<SnapshotHistoryItem>> onCompletion) {
300308
logger.info("[{}] snapshot retention deleting snapshot [{}]", repo, snapshot.snapshotId());
301309
CountDownLatch latch = new CountDownLatch(1);
302-
AtomicReference<SnapshotHistoryItem> result = new AtomicReference<>();
303310
client.admin().cluster().prepareDeleteSnapshot(repo, snapshot.snapshotId().getName())
304311
.execute(new LatchedActionListener<>(new ActionListener<>() {
305312
@Override
306313
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
307314
if (acknowledgedResponse.isAcknowledged()) {
308315
logger.debug("[{}] snapshot [{}] deleted successfully", repo, snapshot.snapshotId());
309-
result.set(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
310-
snapshot.snapshotId().getName(), slmPolicy, repo));
316+
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
317+
snapshot.snapshotId().getName(), slmPolicy, repo)));
311318
} else {
312319
logger.warn("[{}] snapshot [{}] delete issued but the request was not acknowledged", repo, snapshot.snapshotId());
320+
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionPossibleSuccessRecord(Instant.now().toEpochMilli(),
321+
snapshot.snapshotId().getName(), slmPolicy, repo,
322+
"deletion request issued successfully, no acknowledgement received")));
313323
}
314324
slmStats.snapshotDeleted(slmPolicy);
315325
}
@@ -318,16 +328,19 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
318328
public void onFailure(Exception e) {
319329
logger.warn(new ParameterizedMessage("[{}] failed to delete snapshot [{}] for retention",
320330
repo, snapshot.snapshotId()), e);
331+
slmStats.snapshotDeleteFailure(slmPolicy);
332+
SnapshotHistoryItem result;
321333
try {
322-
result.set(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
323-
snapshot.snapshotId().getName(), slmPolicy, repo, e));
334+
result = SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
335+
snapshot.snapshotId().getName(), slmPolicy, repo, e);
324336
} catch (IOException ex) {
325337
// This shouldn't happen unless there's an issue with serializing the original exception
326338
logger.error(new ParameterizedMessage(
327339
"failed to record snapshot creation failure for snapshot lifecycle policy [{}]",
328340
slmPolicy), e);
341+
result = null;
329342
}
330-
slmStats.snapshotDeleteFailure(slmPolicy);
343+
onCompletion.accept(Optional.ofNullable(result));
331344
}
332345
}, latch));
333346
try {
@@ -337,9 +350,9 @@ public void onFailure(Exception e) {
337350
} catch (InterruptedException e) {
338351
logger.error(new ParameterizedMessage("[{}] deletion of snapshot [{}] interrupted",
339352
repo, snapshot.snapshotId()), e);
353+
onCompletion.accept(Optional.empty());
340354
slmStats.snapshotDeleteFailure(slmPolicy);
341355
}
342-
return Optional.ofNullable(result.get());
343356
}
344357

345358
void updateStateWithStats(SnapshotLifecycleStats newStats) {

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -190,21 +190,20 @@ private void retentionTaskTest(final boolean deletionSuccess) throws Exception {
190190
logger.info("--> retrieving snapshots [{}]", snaps);
191191
return Collections.singletonMap(repoId, snaps);
192192
},
193-
(deletionPolicyId, repo, snapInfo, slmStats) -> {
193+
(deletionPolicyId, repo, snapInfo, slmStats, onCompletion) -> {
194194
logger.info("--> deleting {} from repo {}", snapInfo, repo);
195195
deleted.add(snapInfo);
196196
deletionLatch.countDown();
197197
if (deletionSuccess) {
198-
return Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
199-
snapInfo.snapshotId().getName(), policy.getId(), repo));
198+
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
199+
snapInfo.snapshotId().getName(), policy.getId(), repo)));
200200
} else {
201201
try {
202-
return Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
203-
snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed")));
202+
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
203+
snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed"))));
204204
} catch (IOException e) {
205205
logger.error(e);
206206
fail("failed to serialize an exception to json, this should never happen");
207-
return Optional.empty(); // impossible to hit this but necessary to make the compiler happy
208207
}
209208
}
210209
},
@@ -289,7 +288,7 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception
289288
logger.info("--> retrieving snapshots [{}]", snaps);
290289
return Collections.singletonMap(repoId, snaps);
291290
},
292-
(deletionPolicyId, repo, snapInfo, slmStats) -> {
291+
(deletionPolicyId, repo, snapInfo, slmStats, onCompletion) -> {
293292
logger.info("--> deleting {}", snapInfo.snapshotId());
294293
// Don't pause until snapshot 2
295294
if (snapInfo.snapshotId().equals(snap2.snapshotId())) {
@@ -299,16 +298,15 @@ private void timeBoundedDeletion(final boolean deletionSuccess) throws Exception
299298
deleted.add(snapInfo.snapshotId());
300299
deletionLatch.countDown();
301300
if (deletionSuccess) {
302-
return Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
303-
snapInfo.snapshotId().getName(), policy.getId(), repo));
301+
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionSuccessRecord(Instant.now().toEpochMilli(),
302+
snapInfo.snapshotId().getName(), policy.getId(), repo)));
304303
} else {
305304
try {
306-
return Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
307-
snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed")));
305+
onCompletion.accept(Optional.of(SnapshotHistoryItem.deletionFailureRecord(Instant.now().toEpochMilli(),
306+
snapInfo.snapshotId().getName(), policy.getId(), repo, new RuntimeException("deletion_failed"))));
308307
} catch (IOException e) {
309308
logger.error(e);
310309
fail("failed to serialize an exception to json, this should never happen");
311-
return Optional.empty(); // impossible to hit this but necessary to make the compiler happy
312310
}
313311
}
314312
},
@@ -377,13 +375,15 @@ void getAllSuccessfulSnapshots(Collection<String> repositories,
377375
}
378376

379377
@Override
380-
Optional<SnapshotHistoryItem> deleteSnapshot(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats) {
381-
return deleteRunner.apply(policyId, repo, snapshot, slmStats);
378+
void deleteSnapshot(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats,
379+
Consumer<Optional<SnapshotHistoryItem>> onCompletion) {
380+
deleteRunner.apply(policyId, repo, snapshot, slmStats, onCompletion);
382381
}
383382
}
384383

385384
@FunctionalInterface
386385
interface DeleteSnapshotMock {
387-
Optional<SnapshotHistoryItem> apply(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats);
386+
void apply(String policyId, String repo, SnapshotInfo snapshot, SnapshotLifecycleStats slmStats,
387+
Consumer<Optional<SnapshotHistoryItem>> onCompletion);
388388
}
389389
}

0 commit comments

Comments
 (0)