4141import java .util .Set ;
4242import java .util .concurrent .CountDownLatch ;
4343import java .util .concurrent .atomic .AtomicBoolean ;
44- import java .util .concurrent .atomic .AtomicReference ;
44+ import java .util .concurrent .atomic .AtomicInteger ;
4545import java .util .function .Consumer ;
4646import java .util .function .LongSupplier ;
4747import java .util .stream .Collectors ;
@@ -256,21 +256,26 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,
256256
257257 logger .info ("starting snapshot retention deletion for [{}] snapshots" , count );
258258 long startTime = nowNanoSupplier .getAsLong ();
259- int deleted = 0 ;
260- int failed = 0 ;
259+ final AtomicInteger deleted = new AtomicInteger ( 0 ) ;
260+ final AtomicInteger failed = new AtomicInteger ( 0 ) ;
261261 for (Map .Entry <String , List <SnapshotInfo >> entry : snapshotsToDelete .entrySet ()) {
262262 String repo = entry .getKey ();
263263 List <SnapshotInfo > snapshots = entry .getValue ();
264264 for (SnapshotInfo info : snapshots ) {
265- Optional <SnapshotHistoryItem > result = deleteSnapshot (getPolicyId (info ), repo , info , slmStats );
266- if (result .isPresent ()) {
267- if (result .get ().isSuccess ()) {
268- deleted ++;
265+ deleteSnapshot (getPolicyId (info ), repo , info , slmStats , historyItem -> {
266+ // This would be nicer if we could use ifPresentOrElse
267+ historyItem .ifPresent (item -> {
268+ if (item .isSuccess ()) {
269+ deleted .incrementAndGet ();
270+ } else {
271+ failed .incrementAndGet ();
272+ }
273+ historyStore .putAsync (item );
274+ });
275+ if (historyItem .isEmpty ()) {
276+ failed .incrementAndGet ();
269277 }
270- historyStore .putAsync (result .get ());
271- } else {
272- failed ++;
273- }
278+ });
274279 // Check whether we have exceeded the maximum time allowed to spend deleting
275280 // snapshots, if we have, short-circuit the rest of the deletions
276281 TimeValue elapsedDeletionTime = TimeValue .timeValueNanos (nowNanoSupplier .getAsLong () - startTime );
@@ -292,24 +297,29 @@ void deleteSnapshots(Map<String, List<SnapshotInfo>> snapshotsToDelete,
292297
293298 /**
294299 * Delete the given snapshot from the repository in blocking manner
295- * @param repo The repository the snapshot is in
296- * @param snapshot The snapshot metadata
297- * @return If present, a SnapshotHistoryItem containing the results of the deletion. Empty if no response or interrupted.
300+ *
301+ * @param repo The repository the snapshot is in
302+ * @param snapshot The snapshot metadata
303+ * @param onCompletion A callback taking info on the history of the snapshot. If present, a SnapshotHistoryItem containing the results
304+ * of the deletion. Empty if interrupted or failed to serialize exception.
298305 */
299- Optional <SnapshotHistoryItem > deleteSnapshot (String slmPolicy , String repo , SnapshotInfo snapshot , SnapshotLifecycleStats slmStats ) {
306+ void deleteSnapshot (String slmPolicy , String repo , SnapshotInfo snapshot , SnapshotLifecycleStats slmStats ,
307+ Consumer <Optional <SnapshotHistoryItem >> onCompletion ) {
300308 logger .info ("[{}] snapshot retention deleting snapshot [{}]" , repo , snapshot .snapshotId ());
301309 CountDownLatch latch = new CountDownLatch (1 );
302- AtomicReference <SnapshotHistoryItem > result = new AtomicReference <>();
303310 client .admin ().cluster ().prepareDeleteSnapshot (repo , snapshot .snapshotId ().getName ())
304311 .execute (new LatchedActionListener <>(new ActionListener <>() {
305312 @ Override
306313 public void onResponse (AcknowledgedResponse acknowledgedResponse ) {
307314 if (acknowledgedResponse .isAcknowledged ()) {
308315 logger .debug ("[{}] snapshot [{}] deleted successfully" , repo , snapshot .snapshotId ());
309- result . set (SnapshotHistoryItem .deletionSuccessRecord (Instant .now ().toEpochMilli (),
310- snapshot .snapshotId ().getName (), slmPolicy , repo ));
316+ onCompletion . accept ( Optional . of (SnapshotHistoryItem .deletionSuccessRecord (Instant .now ().toEpochMilli (),
317+ snapshot .snapshotId ().getName (), slmPolicy , repo ))) ;
311318 } else {
312319 logger .warn ("[{}] snapshot [{}] delete issued but the request was not acknowledged" , repo , snapshot .snapshotId ());
320+ onCompletion .accept (Optional .of (SnapshotHistoryItem .deletionPossibleSuccessRecord (Instant .now ().toEpochMilli (),
321+ snapshot .snapshotId ().getName (), slmPolicy , repo ,
322+ "deletion request issued successfully, no acknowledgement received" )));
313323 }
314324 slmStats .snapshotDeleted (slmPolicy );
315325 }
@@ -318,16 +328,19 @@ public void onResponse(AcknowledgedResponse acknowledgedResponse) {
318328 public void onFailure (Exception e ) {
319329 logger .warn (new ParameterizedMessage ("[{}] failed to delete snapshot [{}] for retention" ,
320330 repo , snapshot .snapshotId ()), e );
331+ slmStats .snapshotDeleteFailure (slmPolicy );
332+ SnapshotHistoryItem result ;
321333 try {
322- result . set ( SnapshotHistoryItem .deletionFailureRecord (Instant .now ().toEpochMilli (),
323- snapshot .snapshotId ().getName (), slmPolicy , repo , e )) ;
334+ result = SnapshotHistoryItem .deletionFailureRecord (Instant .now ().toEpochMilli (),
335+ snapshot .snapshotId ().getName (), slmPolicy , repo , e );
324336 } catch (IOException ex ) {
325337 // This shouldn't happen unless there's an issue with serializing the original exception
326338 logger .error (new ParameterizedMessage (
327339 "failed to record snapshot creation failure for snapshot lifecycle policy [{}]" ,
328340 slmPolicy ), e );
341+ result = null ;
329342 }
330- slmStats . snapshotDeleteFailure ( slmPolicy );
343+ onCompletion . accept ( Optional . ofNullable ( result ) );
331344 }
332345 }, latch ));
333346 try {
@@ -337,9 +350,9 @@ public void onFailure(Exception e) {
337350 } catch (InterruptedException e ) {
338351 logger .error (new ParameterizedMessage ("[{}] deletion of snapshot [{}] interrupted" ,
339352 repo , snapshot .snapshotId ()), e );
353+ onCompletion .accept (Optional .empty ());
340354 slmStats .snapshotDeleteFailure (slmPolicy );
341355 }
342- return Optional .ofNullable (result .get ());
343356 }
344357
345358 void updateStateWithStats (SnapshotLifecycleStats newStats ) {
0 commit comments