Skip to content

Commit 7ff4a7c

Browse files
committed
opterm
1 parent bf5ab75 commit 7ff4a7c

2 files changed

Lines changed: 41 additions & 29 deletions

File tree

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ public IndexShard(ShardRouting shardRouting, IndexSettings indexSettings, ShardP
277277
indexShardOperationsPermit = new IndexShardOperationsPermit(shardId, logger, threadPool);
278278
searcherWrapper = indexSearcherWrapper;
279279
primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
280+
pendingPrimaryTerm.set(primaryTerm);
280281
refreshListeners = buildRefreshListeners();
281282
persistMetadata(shardRouting, null);
282283
}
@@ -1859,7 +1860,7 @@ public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcq
18591860
indexShardOperationsPermit.acquire(onPermitAcquired, executorOnDelay, false);
18601861
}
18611862

1862-
private final AtomicLong pendingPrimaryTerm = new AtomicLong();
1863+
private final AtomicLong pendingPrimaryTerm = new AtomicLong(); // is always greater or equal to primaryTerm
18631864

18641865
/**
18651866
* Acquire a replica operation permit whenever the shard is ready for indexing (see
@@ -1876,43 +1877,45 @@ public void acquireReplicaOperationPermit(
18761877
final long operationPrimaryTerm, final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay) {
18771878
verifyNotClosed();
18781879
verifyReplicationTarget();
1879-
if (operationPrimaryTerm > primaryTerm
1880-
&& pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) {
1880+
if (operationPrimaryTerm > primaryTerm &&
1881+
pendingPrimaryTerm.getAndAccumulate(operationPrimaryTerm, Math::max) < operationPrimaryTerm) {
1882+
// this means that our operation increased the pendingPrimaryTerm
1883+
// now we are responsible for increasing primaryTerm
18811884
try {
18821885
indexShardOperationsPermit.blockOperations(30, TimeUnit.MINUTES, () -> {
1886+
// re-check as someone with operationPrimaryTerm higher than ours could have come along and increased the primaryTerm
18831887
if (operationPrimaryTerm > primaryTerm) {
18841888
primaryTerm = operationPrimaryTerm;
1889+
// this preserves pendingPrimaryTerm >= primaryTerm, as pendingPrimaryTerm >= operationPrimaryTerm
18851890
}
18861891
});
18871892
} catch (final InterruptedException | TimeoutException e) {
18881893
onPermitAcquired.onFailure(e);
18891894
}
18901895
}
18911896

1892-
final long currentPrimaryTerm = primaryTerm;
1893-
if (operationPrimaryTerm == currentPrimaryTerm) {
1894-
indexShardOperationsPermit.acquire(
1895-
new ActionListener<Releasable>() {
1896-
@Override
1897-
public void onResponse(final Releasable releasable) {
1898-
if (operationPrimaryTerm < primaryTerm) {
1899-
releasable.close();
1900-
onOperationPrimaryTermIsTooOld(shardId, operationPrimaryTerm, primaryTerm, onPermitAcquired);
1901-
} else if (operationPrimaryTerm == primaryTerm) {
1902-
onPermitAcquired.onResponse(releasable);
1903-
}
1904-
}
1897+
indexShardOperationsPermit.acquire(
1898+
() -> operationPrimaryTerm <= primaryTerm, // permit is released and acquisition is delayed if this does not hold
1899+
new ActionListener<Releasable>() {
1900+
@Override
1901+
public void onResponse(final Releasable releasable) {
1902+
assert operationPrimaryTerm <= primaryTerm; // otherwise we would have released the permit and delayed
1903+
if (operationPrimaryTerm < primaryTerm) {
1904+
releasable.close();
1905+
onOperationPrimaryTermIsTooOld(shardId, operationPrimaryTerm, primaryTerm, onPermitAcquired);
1906+
} else {
1907+
assert operationPrimaryTerm == primaryTerm;
1908+
onPermitAcquired.onResponse(releasable);
1909+
}
1910+
}
19051911

1906-
@Override
1907-
public void onFailure(final Exception e) {
1908-
onPermitAcquired.onFailure(e);
1909-
}
1910-
},
1911-
executorOnDelay,
1912-
true);
1913-
} else {
1914-
onOperationPrimaryTermIsTooOld(shardId, operationPrimaryTerm, currentPrimaryTerm, onPermitAcquired);
1915-
}
1912+
@Override
1913+
public void onFailure(final Exception e) {
1914+
onPermitAcquired.onFailure(e);
1915+
}
1916+
},
1917+
executorOnDelay,
1918+
true);
19161919
}
19171920

19181921
private static void onOperationPrimaryTermIsTooOld(

core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationsPermit.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,26 +106,35 @@ public void blockOperations(long timeout, TimeUnit timeUnit, Runnable onBlocked)
106106
}
107107
}
108108

109+
public void acquire(ActionListener<Releasable> onAcquired, String executorOnDelay, boolean forceExecution) {
110+
acquire(() -> true, onAcquired, executorOnDelay, forceExecution);
111+
}
112+
109113
/**
110114
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
111115
* {@link ActionListener} will be called on the calling thread. During calls of {@link #blockOperations(long, TimeUnit, Runnable)},
112116
* permit acquisition can be delayed. The provided ActionListener will then be called using the provided executor once operations are no
113117
* longer blocked.
114118
*
119+
* @param extraCondition predicate that must hold if action should not be delayed. Called when permit is acquired.
115120
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
116121
* @param executorOnDelay executor to use for delayed call
117122
* @param forceExecution whether the runnable should force its execution in case it gets rejected
118123
*/
119-
public void acquire(ActionListener<Releasable> onAcquired, String executorOnDelay, boolean forceExecution) {
124+
public void acquire(Supplier<Boolean> extraCondition, ActionListener<Releasable> onAcquired, String executorOnDelay,
125+
boolean forceExecution) {
120126
if (closed) {
121127
onAcquired.onFailure(new IndexShardClosedException(shardId));
122128
return;
123129
}
124-
Releasable releasable;
130+
final Releasable releasable;
125131
try {
126132
synchronized (this) {
127133
releasable = tryAcquire();
128-
if (releasable == null) {
134+
if (releasable == null || extraCondition.get() == false) {
135+
if (releasable != null) {
136+
releasable.close();
137+
}
129138
// blockOperations is executing, this operation will be retried by blockOperations once it finishes
130139
if (delayedOperations == null) {
131140
delayedOperations = new ArrayList<>();

0 commit comments

Comments
 (0)