2323import org .apache .logging .log4j .message .ParameterizedMessage ;
2424import org .apache .logging .log4j .util .Supplier ;
2525import org .apache .lucene .index .IndexCommit ;
26+ import org .apache .lucene .util .SetOnce ;
2627import org .elasticsearch .ExceptionsHelper ;
2728import org .elasticsearch .cluster .ClusterChangedEvent ;
2829import org .elasticsearch .cluster .ClusterState ;
@@ -147,7 +148,6 @@ protected void doStop() {
147148 } finally {
148149 shutdownLock .unlock ();
149150 }
150-
151151 }
152152
153153 @ Override
@@ -282,17 +282,18 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
282282 snapshotStatus .abort ();
283283 break ;
284284 case FINALIZE :
285- logger .debug ("[{}] trying to cancel snapshot on shard [{}] that is finalizing, letting it finish" , entry .snapshot (), shard .key );
285+ logger .debug ("[{}] trying to cancel snapshot on shard [{}] that is finalizing, " +
286+ "letting it finish" , entry .snapshot (), shard .key );
286287 break ;
287288 case DONE :
288- logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master" , entry . snapshot (), shard . key );
289- updateIndexShardSnapshotStatus ( entry .snapshot (), shard .key ,
290- new ShardSnapshotStatus ( localNodeId , State . SUCCESS ) , masterNode );
289+ logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that is already done, " +
290+ "updating status on the master" , entry .snapshot (), shard .key );
291+ notifySuccessfulSnapshotShard ( entry . snapshot (), shard . key , localNodeId , masterNode );
291292 break ;
292293 case FAILURE :
293- logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master" , entry . snapshot (), shard . key );
294- updateIndexShardSnapshotStatus ( entry .snapshot (), shard .key ,
295- new ShardSnapshotStatus ( localNodeId , State . FAILED , snapshotStatus .failure () ), masterNode );
294+ logger .debug ("[{}] trying to cancel snapshot on the shard [{}] that has already failed, " +
295+ "updating status on the master" , entry .snapshot (), shard .key );
296+ notifyFailedSnapshotShard ( entry . snapshot (), shard . key , localNodeId , snapshotStatus .failure (), masterNode );
296297 break ;
297298 default :
298299 throw new IllegalStateException ("Unknown snapshot shard stage " + snapshotStatus .stage ());
@@ -321,34 +322,47 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
321322 if (newSnapshots .isEmpty () == false ) {
322323 Executor executor = threadPool .executor (ThreadPool .Names .SNAPSHOT );
323324 for (final Map .Entry <Snapshot , Map <ShardId , IndexShardSnapshotStatus >> entry : newSnapshots .entrySet ()) {
324- Map <String , IndexId > indicesMap = snapshotIndices .get (entry .getKey ());
325+ final Snapshot snapshot = entry .getKey ();
326+ final Map <String , IndexId > indicesMap = snapshotIndices .get (snapshot );
325327 assert indicesMap != null ;
328+
326329 for (final Map .Entry <ShardId , IndexShardSnapshotStatus > shardEntry : entry .getValue ().entrySet ()) {
327330 final ShardId shardId = shardEntry .getKey ();
328- try {
329- final IndexShard indexShard = indicesService .indexServiceSafe (shardId .getIndex ()).getShardOrNull (shardId .id ());
330- final IndexId indexId = indicesMap .get (shardId .getIndexName ());
331- assert indexId != null ;
332- executor .execute (new AbstractRunnable () {
333- @ Override
334- public void doRun () {
335- snapshot (indexShard , entry .getKey (), indexId , shardEntry .getValue ());
336- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
337- new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
338- }
331+ final IndexShard indexShard = indicesService .indexServiceSafe (shardId .getIndex ()).getShardOrNull (shardId .id ());
332+ final IndexId indexId = indicesMap .get (shardId .getIndexName ());
333+ assert indexId != null ;
334+ executor .execute (new AbstractRunnable () {
339335
340- @ Override
341- public void onFailure (Exception e ) {
342- logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to create snapshot" , shardId , entry .getKey ()), e );
343- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
344- new ShardSnapshotStatus (localNodeId , State .FAILED , ExceptionsHelper .detailedMessage (e )), masterNode );
345- }
336+ final SetOnce <Exception > failure = new SetOnce <>();
346337
347- });
348- } catch (Exception e ) {
349- updateIndexShardSnapshotStatus (entry .getKey (), shardId ,
350- new ShardSnapshotStatus (localNodeId , State .FAILED , ExceptionsHelper .detailedMessage (e )), masterNode );
351- }
338+ @ Override
339+ public void doRun () {
340+ snapshot (indexShard , snapshot , indexId , shardEntry .getValue ());
341+ }
342+
343+ @ Override
344+ public void onFailure (Exception e ) {
345+ logger .warn ((Supplier <?>) () ->
346+ new ParameterizedMessage ("[{}][{}] failed to snapshot shard" , shardId , snapshot ), e );
347+ failure .set (e );
348+ }
349+
350+ @ Override
351+ public void onRejection (Exception e ) {
352+ failure .set (e );
353+ }
354+
355+ @ Override
356+ public void onAfter () {
357+ final Exception exception = failure .get ();
358+ if (exception != null ) {
359+ final String failure = ExceptionsHelper .detailedMessage (exception );
360+ notifyFailedSnapshotShard (snapshot , shardId , localNodeId , failure , masterNode );
361+ } else {
362+ notifySuccessfulSnapshotShard (snapshot , shardId , localNodeId , masterNode );
363+ }
364+ }
365+ });
352366 }
353367 }
354368 }
@@ -361,37 +375,39 @@ public void onFailure(Exception e) {
361375 * @param snapshotStatus snapshot status
362376 */
363377 private void snapshot (final IndexShard indexShard , final Snapshot snapshot , final IndexId indexId , final IndexShardSnapshotStatus snapshotStatus ) {
364- Repository repository = snapshotsService .getRepositoriesService ().repository (snapshot .getRepository ());
365- ShardId shardId = indexShard .shardId ();
366- if (!indexShard .routingEntry ().primary ()) {
378+ final ShardId shardId = indexShard .shardId ();
379+ if (indexShard .routingEntry ().primary () == false ) {
367380 throw new IndexShardSnapshotFailedException (shardId , "snapshot should be performed only on primary" );
368381 }
369382 if (indexShard .routingEntry ().relocating ()) {
370383 // do not snapshot when in the process of relocation of primaries so we won't get conflicts
371384 throw new IndexShardSnapshotFailedException (shardId , "cannot snapshot while relocating" );
372385 }
373- if (indexShard .state () == IndexShardState .CREATED || indexShard .state () == IndexShardState .RECOVERING ) {
386+
387+ final IndexShardState indexShardState = indexShard .state ();
388+ if (indexShardState == IndexShardState .CREATED || indexShardState == IndexShardState .RECOVERING ) {
374389 // shard has just been created, or still recovering
375390 throw new IndexShardSnapshotFailedException (shardId , "shard didn't fully recover yet" );
376391 }
377392
393+ final Repository repository = snapshotsService .getRepositoriesService ().repository (snapshot .getRepository ());
378394 try {
379395 // we flush first to make sure we get the latest writes snapshotted
380396 IndexCommit snapshotIndexCommit = indexShard .acquireIndexCommit (true );
381397 try {
382398 repository .snapshotShard (indexShard , snapshot .getSnapshotId (), indexId , snapshotIndexCommit , snapshotStatus );
383399 if (logger .isDebugEnabled ()) {
384- StringBuilder sb = new StringBuilder ();
385- sb .append (" index : version [" ).append (snapshotStatus .indexVersion ()).append ("], number_of_files [" ).append (snapshotStatus .numberOfFiles ()).append ("] with total_size [" ).append (new ByteSizeValue (snapshotStatus .totalSize ())).append ("]\n " );
400+ StringBuilder details = new StringBuilder ();
401+ details .append (" index : version [" ).append (snapshotStatus .indexVersion ());
402+ details .append ("], number_of_files [" ).append (snapshotStatus .numberOfFiles ());
403+ details .append ("] with total_size [" ).append (new ByteSizeValue (snapshotStatus .totalSize ())).append ("]\n " );
386404 logger .debug ("snapshot ({}) completed to {}, took [{}]\n {}" , snapshot , repository ,
387- TimeValue .timeValueMillis (snapshotStatus .time ()), sb );
405+ TimeValue .timeValueMillis (snapshotStatus .time ()), details );
388406 }
389407 } finally {
390408 indexShard .releaseIndexCommit (snapshotIndexCommit );
391409 }
392- } catch (SnapshotFailedEngineException e ) {
393- throw e ;
394- } catch (IndexShardSnapshotFailedException e ) {
410+ } catch (SnapshotFailedEngineException | IndexShardSnapshotFailedException e ) {
395411 throw e ;
396412 } catch (Exception e ) {
397413 throw new IndexShardSnapshotFailedException (shardId , "Failed to snapshot" , e );
@@ -406,6 +422,7 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
406422 if (snapshotsInProgress == null ) {
407423 return ;
408424 }
425+
409426 final String localNodeId = event .state ().nodes ().getLocalNodeId ();
410427 final DiscoveryNode masterNode = event .state ().nodes ().getMasterNode ();
411428 for (SnapshotsInProgress .Entry snapshot : snapshotsInProgress .entries ()) {
@@ -421,15 +438,16 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
421438 // Master knows about the shard and thinks it has not completed
422439 if (localShardStatus .stage () == Stage .DONE ) {
423440 // but we think the shard is done - we need to make new master know that the shard is done
424- logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master" , snapshot .snapshot (), shardId );
425- updateIndexShardSnapshotStatus (snapshot .snapshot (), shardId ,
426- new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
441+ logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, " +
442+ "updating status on the master" , snapshot .snapshot (), shardId );
443+ notifySuccessfulSnapshotShard (snapshot .snapshot (), shardId , localNodeId , masterNode );
444+
427445 } else if (localShard .getValue ().stage () == Stage .FAILURE ) {
428446 // but we think the shard failed - we need to make new master know that the shard failed
429- logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master" , snapshot . snapshot (), shardId );
430- updateIndexShardSnapshotStatus ( snapshot .snapshot (), shardId ,
431- new ShardSnapshotStatus ( localNodeId , State . FAILED , localShardStatus .failure ()), masterNode );
432-
447+ logger .debug ("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, " +
448+ "updating status on master" , snapshot .snapshot (), shardId );
449+ final String failure = localShardStatus .failure ();
450+ notifyFailedSnapshotShard ( snapshot . snapshot (), shardId , localNodeId , failure , masterNode );
433451 }
434452 }
435453 }
@@ -449,7 +467,6 @@ private SnapshotShards(Map<ShardId, IndexShardSnapshotStatus> shards) {
449467 }
450468 }
451469
452-
453470 /**
454471 * Internal request that is used to send changes in snapshot status to master
455472 */
@@ -512,15 +529,33 @@ public boolean isProcessed() {
512529 }
513530 }
514531
515- /**
516- * Updates the shard status
517- */
518- public void updateIndexShardSnapshotStatus (Snapshot snapshot , ShardId shardId , ShardSnapshotStatus status , DiscoveryNode master ) {
519- UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest (snapshot , shardId , status );
532+ /** Notify the master node that the given shard has been successfully snapshotted **/
533+ void notifySuccessfulSnapshotShard (final Snapshot snapshot ,
534+ final ShardId shardId ,
535+ final String localNodeId ,
536+ final DiscoveryNode masterNode ) {
537+ sendSnapshotShardUpdate (snapshot , shardId , new ShardSnapshotStatus (localNodeId , State .SUCCESS ), masterNode );
538+ }
539+
540+ /** Notify the master node that the given shard failed to be snapshotted **/
541+ void notifyFailedSnapshotShard (final Snapshot snapshot ,
542+ final ShardId shardId ,
543+ final String localNodeId ,
544+ final String failure ,
545+ final DiscoveryNode masterNode ) {
546+ sendSnapshotShardUpdate (snapshot , shardId , new ShardSnapshotStatus (localNodeId , State .FAILED , failure ), masterNode );
547+ }
548+
549+ /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */
550+ void sendSnapshotShardUpdate (final Snapshot snapshot ,
551+ final ShardId shardId ,
552+ final ShardSnapshotStatus status ,
553+ final DiscoveryNode masterNode ) {
520554 try {
521- transportService .sendRequest (master , UPDATE_SNAPSHOT_ACTION_NAME , request , EmptyTransportResponseHandler .INSTANCE_SAME );
555+ final UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest (snapshot , shardId , status );
556+ transportService .sendRequest (masterNode , UPDATE_SNAPSHOT_ACTION_NAME , request , EmptyTransportResponseHandler .INSTANCE_SAME );
522557 } catch (Exception e ) {
523- logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to update snapshot state" , request . snapshot (), request . status () ), e );
558+ logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("[{}] [{}] failed to update snapshot state" , snapshot , status ), e );
524559 }
525560 }
526561
0 commit comments