@@ -277,6 +277,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP
277277 indexShardOperationsPermit = new IndexShardOperationsPermit (shardId , logger , threadPool );
278278 searcherWrapper = indexSearcherWrapper ;
279279 primaryTerm = indexSettings .getIndexMetaData ().primaryTerm (shardId .id ());
280+ pendingPrimaryTerm .set (primaryTerm );
280281 refreshListeners = buildRefreshListeners ();
281282 persistMetadata (shardRouting , null );
282283 }
@@ -1859,7 +1860,7 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
18591860 indexShardOperationsPermit .acquire (onPermitAcquired , executorOnDelay , false );
18601861 }
18611862
1862- private final AtomicLong pendingPrimaryTerm = new AtomicLong ();
1863+ private final AtomicLong pendingPrimaryTerm = new AtomicLong (); // is always greater or equal to primaryTerm
18631864
18641865 /**
18651866 * Acquire a replica operation permit whenever the shard is ready for indexing (see
@@ -1876,43 +1877,45 @@ public void acquireReplicaOperationPermit(
18761877 final long operationPrimaryTerm , final ActionListener <Releasable > onPermitAcquired , final String executorOnDelay ) {
18771878 verifyNotClosed ();
18781879 verifyReplicationTarget ();
1879- if (operationPrimaryTerm > primaryTerm
1880- && pendingPrimaryTerm .accumulateAndGet (operationPrimaryTerm , Math ::max ) == operationPrimaryTerm ) {
1880+ if (operationPrimaryTerm > primaryTerm &&
1881+ pendingPrimaryTerm .getAndAccumulate (operationPrimaryTerm , Math ::max ) < operationPrimaryTerm ) {
1882+ // this means that our operation increased the pendingPrimaryTerm
1883+ // now we are responsible for increasing primaryTerm
18811884 try {
18821885 indexShardOperationsPermit .blockOperations (30 , TimeUnit .MINUTES , () -> {
1886+ // re-check as someone with operationPrimaryTerm higher than ours could have come along and increased the primaryTerm
18831887 if (operationPrimaryTerm > primaryTerm ) {
18841888 primaryTerm = operationPrimaryTerm ;
1889+ // this preserves pendingPrimaryTerm >= primaryTerm, as pendingPrimaryTerm >= operationPrimaryTerm
18851890 }
18861891 });
18871892 } catch (final InterruptedException | TimeoutException e ) {
18881893 onPermitAcquired .onFailure (e );
18891894 }
18901895 }
18911896
1892- final long currentPrimaryTerm = primaryTerm ;
1893- if (operationPrimaryTerm == currentPrimaryTerm ) {
1894- indexShardOperationsPermit .acquire (
1895- new ActionListener <Releasable >() {
1896- @ Override
1897- public void onResponse (final Releasable releasable ) {
1898- if (operationPrimaryTerm < primaryTerm ) {
1899- releasable .close ();
1900- onOperationPrimaryTermIsTooOld (shardId , operationPrimaryTerm , primaryTerm , onPermitAcquired );
1901- } else if (operationPrimaryTerm == primaryTerm ) {
1902- onPermitAcquired .onResponse (releasable );
1903- }
1904- }
1897+ indexShardOperationsPermit .acquire (
1898+ () -> operationPrimaryTerm <= primaryTerm , // permit is released and acquisition is delayed if this does not hold
1899+ new ActionListener <Releasable >() {
1900+ @ Override
1901+ public void onResponse (final Releasable releasable ) {
1902+ assert operationPrimaryTerm <= primaryTerm ; // otherwise we would have released the permit and delayed
1903+ if (operationPrimaryTerm < primaryTerm ) {
1904+ releasable .close ();
1905+ onOperationPrimaryTermIsTooOld (shardId , operationPrimaryTerm , primaryTerm , onPermitAcquired );
1906+ } else {
1907+ assert operationPrimaryTerm == primaryTerm ;
1908+ onPermitAcquired .onResponse (releasable );
1909+ }
1910+ }
19051911
1906- @ Override
1907- public void onFailure (final Exception e ) {
1908- onPermitAcquired .onFailure (e );
1909- }
1910- },
1911- executorOnDelay ,
1912- true );
1913- } else {
1914- onOperationPrimaryTermIsTooOld (shardId , operationPrimaryTerm , currentPrimaryTerm , onPermitAcquired );
1915- }
1912+ @ Override
1913+ public void onFailure (final Exception e ) {
1914+ onPermitAcquired .onFailure (e );
1915+ }
1916+ },
1917+ executorOnDelay ,
1918+ true );
19161919 }
19171920
19181921 private static void onOperationPrimaryTermIsTooOld (
0 commit comments