@@ -236,9 +236,39 @@ protected TransportRequestOptions transportOptions(Settings settings) {
236236 return TransportRequestOptions .EMPTY ;
237237 }
238238
239+ private String concreteIndex (final ClusterState state , final ReplicationRequest request ) {
240+ return resolveIndex () ? indexNameExpressionResolver .concreteSingleIndex (state , request ).getName () : request .index ();
241+ }
242+
243+ private ClusterBlockException blockExceptions (final ClusterState state , final String indexName ) {
244+ ClusterBlockLevel globalBlockLevel = globalBlockLevel ();
245+ if (globalBlockLevel != null ) {
246+ ClusterBlockException blockException = state .blocks ().globalBlockedException (globalBlockLevel );
247+ if (blockException != null ) {
248+ return blockException ;
249+ }
250+ }
251+ ClusterBlockLevel indexBlockLevel = indexBlockLevel ();
252+ if (indexBlockLevel != null ) {
253+ ClusterBlockException blockException = state .blocks ().indexBlockedException (indexBlockLevel , indexName );
254+ if (blockException != null ) {
255+ return blockException ;
256+ }
257+ }
258+ return null ;
259+ }
260+
239261 protected boolean retryPrimaryException (final Throwable e ) {
240262 return e .getClass () == ReplicationOperation .RetryOnPrimaryException .class
241- || TransportActions .isShardNotAvailableException (e );
263+ || TransportActions .isShardNotAvailableException (e )
264+ || isRetryableClusterBlockException (e );
265+ }
266+
267+ boolean isRetryableClusterBlockException (final Throwable e ) {
268+ if (e instanceof ClusterBlockException ) {
269+ return ((ClusterBlockException ) e ).retryable ();
270+ }
271+ return false ;
242272 }
243273
244274 protected class OperationTransportHandler implements TransportRequestHandler <Request > {
@@ -321,6 +351,15 @@ protected void doRun() throws Exception {
321351 @ Override
322352 public void onResponse (PrimaryShardReference primaryShardReference ) {
323353 try {
354+ final ClusterState clusterState = clusterService .state ();
355+ final IndexMetaData indexMetaData = clusterState .metaData ().getIndexSafe (primaryShardReference .routingEntry ().index ());
356+
357+ final ClusterBlockException blockException = blockExceptions (clusterState , indexMetaData .getIndex ().getName ());
358+ if (blockException != null ) {
359+ logger .trace ("cluster is blocked, action failed on primary" , blockException );
360+ throw blockException ;
361+ }
362+
324363 if (primaryShardReference .isRelocated ()) {
325364 primaryShardReference .close (); // release shard operation lock as soon as possible
326365 setPhase (replicationTask , "primary_delegation" );
@@ -334,7 +373,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) {
334373 response .readFrom (in );
335374 return response ;
336375 };
337- DiscoveryNode relocatingNode = clusterService . state () .nodes ().get (primary .relocatingNodeId ());
376+ DiscoveryNode relocatingNode = clusterState .nodes ().get (primary .relocatingNodeId ());
338377 transportService .sendRequest (relocatingNode , transportPrimaryAction ,
339378 new ConcreteShardRequest <>(request , primary .allocationId ().getRelocationId (), primaryTerm ),
340379 transportOptions ,
@@ -713,35 +752,42 @@ public void onFailure(Exception e) {
713752 protected void doRun () {
714753 setPhase (task , "routing" );
715754 final ClusterState state = observer .setAndGetObservedState ();
716- if (handleBlockExceptions (state )) {
717- return ;
718- }
719-
720- // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
721- final String concreteIndex = concreteIndex (state );
722- final IndexMetaData indexMetaData = state .metaData ().index (concreteIndex );
723- if (indexMetaData == null ) {
724- retry (new IndexNotFoundException (concreteIndex ));
725- return ;
726- }
727- if (indexMetaData .getState () == IndexMetaData .State .CLOSE ) {
728- throw new IndexClosedException (indexMetaData .getIndex ());
729- }
755+ final String concreteIndex = concreteIndex (state , request );
756+ final ClusterBlockException blockException = blockExceptions (state , concreteIndex );
757+ if (blockException != null ) {
758+ if (blockException .retryable ()) {
759+ logger .trace ("cluster is blocked, scheduling a retry" , blockException );
760+ retry (blockException );
761+ } else {
762+ finishAsFailed (blockException );
763+ }
764+ } else {
765+ // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
766+ final IndexMetaData indexMetaData = state .metaData ().index (concreteIndex );
767+ if (indexMetaData == null ) {
768+ retry (new IndexNotFoundException (concreteIndex ));
769+ return ;
770+ }
771+ if (indexMetaData .getState () == IndexMetaData .State .CLOSE ) {
772+ throw new IndexClosedException (indexMetaData .getIndex ());
773+ }
730774
731- // resolve all derived request fields, so we can route and apply it
732- resolveRequest (indexMetaData , request );
733- assert request .shardId () != null : "request shardId must be set in resolveRequest" ;
734- assert request .waitForActiveShards () != ActiveShardCount .DEFAULT : "request waitForActiveShards must be set in resolveRequest" ;
775+ // resolve all derived request fields, so we can route and apply it
776+ resolveRequest (indexMetaData , request );
777+ assert request .shardId () != null : "request shardId must be set in resolveRequest" ;
778+ assert request .waitForActiveShards () != ActiveShardCount .DEFAULT :
779+ "request waitForActiveShards must be set in resolveRequest" ;
735780
736- final ShardRouting primary = primary (state );
737- if (retryIfUnavailable (state , primary )) {
738- return ;
739- }
740- final DiscoveryNode node = state .nodes ().get (primary .currentNodeId ());
741- if (primary .currentNodeId ().equals (state .nodes ().getLocalNodeId ())) {
742- performLocalAction (state , primary , node , indexMetaData );
743- } else {
744- performRemoteAction (state , primary , node );
781+ final ShardRouting primary = primary (state );
782+ if (retryIfUnavailable (state , primary )) {
783+ return ;
784+ }
785+ final DiscoveryNode node = state .nodes ().get (primary .currentNodeId ());
786+ if (primary .currentNodeId ().equals (state .nodes ().getLocalNodeId ())) {
787+ performLocalAction (state , primary , node , indexMetaData );
788+ } else {
789+ performRemoteAction (state , primary , node );
790+ }
745791 }
746792 }
747793
@@ -793,44 +839,11 @@ private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
793839 return false ;
794840 }
795841
796- private String concreteIndex (ClusterState state ) {
797- return resolveIndex () ? indexNameExpressionResolver .concreteSingleIndex (state , request ).getName () : request .index ();
798- }
799-
800842 private ShardRouting primary (ClusterState state ) {
801843 IndexShardRoutingTable indexShard = state .getRoutingTable ().shardRoutingTable (request .shardId ());
802844 return indexShard .primaryShard ();
803845 }
804846
805- private boolean handleBlockExceptions (ClusterState state ) {
806- ClusterBlockLevel globalBlockLevel = globalBlockLevel ();
807- if (globalBlockLevel != null ) {
808- ClusterBlockException blockException = state .blocks ().globalBlockedException (globalBlockLevel );
809- if (blockException != null ) {
810- handleBlockException (blockException );
811- return true ;
812- }
813- }
814- ClusterBlockLevel indexBlockLevel = indexBlockLevel ();
815- if (indexBlockLevel != null ) {
816- ClusterBlockException blockException = state .blocks ().indexBlockedException (indexBlockLevel , concreteIndex (state ));
817- if (blockException != null ) {
818- handleBlockException (blockException );
819- return true ;
820- }
821- }
822- return false ;
823- }
824-
825- private void handleBlockException (ClusterBlockException blockException ) {
826- if (blockException .retryable ()) {
827- logger .trace ("cluster is blocked, scheduling a retry" , blockException );
828- retry (blockException );
829- } else {
830- finishAsFailed (blockException );
831- }
832- }
833-
834847 private void performAction (final DiscoveryNode node , final String action , final boolean isPrimaryAction ,
835848 final TransportRequest requestToPerform ) {
836849 transportService .sendRequest (node , action , requestToPerform , transportOptions , new TransportResponseHandler <Response >() {
0 commit comments