@@ -607,12 +607,10 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
607607 public void relocated (final Consumer <ReplicationTracker .PrimaryContext > consumer )
608608 throws IllegalIndexShardStateException , InterruptedException {
609609 assert shardRouting .primary () : "only primaries can be marked as relocated: " + shardRouting ;
610- refreshListeners .disallowAdd ();
610+ final Releasable forceRefreshes = refreshListeners .forceRefreshes ();
611611 try {
612- if (refreshListeners .refreshNeeded ()) {
613- refresh ("relocated" );
614- }
615612 indexShardOperationPermits .blockOperations (30 , TimeUnit .MINUTES , () -> {
613+ forceRefreshes .close ();
616614 // no shard operation permits are being held here, move state from started to relocated
617615 assert indexShardOperationPermits .getActiveOperationsCount () == 0 :
618616 "in-flight operations in progress while moving shard state to relocated" ;
@@ -644,7 +642,7 @@ public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer
644642 failShard ("timed out waiting for relocation hand-off to complete" , null );
645643 throw new IndexShardClosedException (shardId (), "timed out waiting for relocation hand-off to complete" );
646644 } finally {
647- refreshListeners . allowAdd ();
645+ forceRefreshes . close ();
648646 }
649647 }
650648
@@ -2345,15 +2343,22 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
23452343 verifyNotClosed ();
23462344 assert shardRouting .primary () : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting ;
23472345
2348- refreshListeners .disallowAdd ();
2346+ asyncBlockOperations (onPermitAcquired , timeout .duration (), timeout .timeUnit ());
2347+ }
2348+
2349+ private void asyncBlockOperations (ActionListener <Releasable > onPermitAcquired , long timeout , TimeUnit timeUnit ) {
2350+ final Releasable forceRefreshes = refreshListeners .forceRefreshes ();
2351+ final ActionListener <Releasable > wrappedListener = ActionListener .wrap (r -> {
2352+ forceRefreshes .close ();
2353+ onPermitAcquired .onResponse (r );
2354+ }, e -> {
2355+ forceRefreshes .close ();
2356+ onPermitAcquired .onFailure (e );
2357+ });
23492358 try {
2350- if (refreshListeners .refreshNeeded ()) {
2351- refresh ("acquire all primary operations permits" );
2352- }
2353- indexShardOperationPermits .asyncBlockOperations (
2354- ensuringAllowRefreshListeners (onPermitAcquired ), timeout .duration (), timeout .timeUnit ());
2359+ indexShardOperationPermits .asyncBlockOperations (wrappedListener , timeout , timeUnit );
23552360 } catch (Exception e ) {
2356- refreshListeners . allowAdd ();
2361+ forceRefreshes . close ();
23572362 throw e ;
23582363 }
23592364 }
@@ -2365,7 +2370,7 @@ private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
23652370 assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null );
23662371 assert operationPrimaryTerm <= pendingPrimaryTerm ;
23672372 final CountDownLatch termUpdated = new CountDownLatch (1 );
2368- final ActionListener < Releasable > actionListener = new ActionListener <Releasable >() {
2373+ asyncBlockOperations ( new ActionListener <Releasable >() {
23692374 @ Override
23702375 public void onFailure (final Exception e ) {
23712376 try {
@@ -2411,41 +2416,9 @@ public void onResponse(final Releasable releasable) {
24112416 }
24122417 }
24132418 }
2414- };
2415- refreshListeners .disallowAdd ();
2416- try {
2417- if (refreshListeners .refreshNeeded ()) {
2418- refresh ("bump primary term" );
2419- }
2420- indexShardOperationPermits .asyncBlockOperations (ensuringAllowRefreshListeners (actionListener ), 30 , TimeUnit .MINUTES );
2421- pendingPrimaryTerm = newPrimaryTerm ;
2422- termUpdated .countDown ();
2423- } catch (Exception e ) {
2424- refreshListeners .allowAdd ();
2425- throw e ;
2426- }
2427- }
2428-
2429- /**
2430- * Wraps an ActionListener so that {@link RefreshListeners#allowAdd()} is always called on {@link #refreshListeners}
2431- * after the listeners actions have executed.
2432- * @param actionListener ActionListener to wrap
2433- * @return Wrapped ActionListener
2434- */
2435- private ActionListener <Releasable > ensuringAllowRefreshListeners (ActionListener <Releasable > actionListener ) {
2436- return ActionListener .wrap (r -> {
2437- try {
2438- actionListener .onResponse (r );
2439- } finally {
2440- refreshListeners .allowAdd ();
2441- }
2442- }, e -> {
2443- try {
2444- actionListener .onFailure (e );
2445- } finally {
2446- refreshListeners .allowAdd ();
2447- }
2448- });
2419+ }, 30 , TimeUnit .MINUTES );
2420+ pendingPrimaryTerm = newPrimaryTerm ;
2421+ termUpdated .countDown ();
24492422 }
24502423
24512424 /**
@@ -2490,21 +2463,10 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
24902463 final long maxSeqNoOfUpdatesOrDeletes ,
24912464 final ActionListener <Releasable > onPermitAcquired ,
24922465 final TimeValue timeout ) {
2493- refreshListeners .disallowAdd ();
2494- try {
2495- if (refreshListeners .refreshNeeded ()) {
2496- refresh ("acquire all replica operations permits" );
2497- }
2498- innerAcquireReplicaOperationPermit (opPrimaryTerm , globalCheckpoint , maxSeqNoOfUpdatesOrDeletes ,
2499- onPermitAcquired , true ,
2500- listener -> indexShardOperationPermits .asyncBlockOperations (
2501- ensuringAllowRefreshListeners (listener ), timeout .duration (), timeout .timeUnit ()
2502- )
2503- );
2504- } catch (Exception e ) {
2505- refreshListeners .allowAdd ();
2506- throw e ;
2507- }
2466+ innerAcquireReplicaOperationPermit (opPrimaryTerm , globalCheckpoint , maxSeqNoOfUpdatesOrDeletes ,
2467+ onPermitAcquired , true ,
2468+ listener -> asyncBlockOperations (listener , timeout .duration (), timeout .timeUnit ())
2469+ );
25082470 }
25092471
25102472 private void innerAcquireReplicaOperationPermit (final long opPrimaryTerm ,
0 commit comments