Introduce clean transition on primary promotion#24925
Introduce clean transition on primary promotion#24925jasontedor merged 14 commits intoelastic:masterfrom
Conversation
4f7ba68 to
8d0b64a
Compare
This commit introduces a clean transition from the old primary term to the new primary term when a replica is promoted primary. To accomplish this, we delay all operations before incrementing the primary term. The delay is guaranteed to be in place before we increment the term, and then all operations that are delayed are executed after the delay is removed which asynchronously happens on another thread. This thread does not progress until in-flight operations that were executing are completed, and after these operations drain, the delayed operations re-acquire permits and are executed.
8d0b64a to
3b4a5c0
Compare
bleskes
left a comment
There was a problem hiding this comment.
Thanks @jasontedor . Basics flow looks good. I left some suggestions.
| private static final int TOTAL_PERMITS = Integer.MAX_VALUE; | ||
| // fair semaphore to ensure that blockOperations() does not starve under thread contention | ||
| final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); | ||
| final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE, true); // fair to ensure a blocking thread is not starved |
There was a problem hiding this comment.
why switch from the TOTAL_PERMITS field?
There was a problem hiding this comment.
I can't even up come with a plausible theory how that happened, that was completely unintentional.
There was a problem hiding this comment.
Oh, I know what happened! I was playing around with the idea of using a shared/exclusive lock so removed the semaphore and then added it back manually (so with this change) after I decided against it (because it doesn't give an easy way to see how many threads hold the shared lock which we use in testing).
| */ | ||
| public <E extends Exception> void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable<E> onBlocked) throws | ||
| InterruptedException, TimeoutException, E { | ||
| <E extends Exception> void syncBlockOperations( |
There was a problem hiding this comment.
nit - I like the blockOperations naming... syncX is just one letter away from asyncX..
| synchronized (this) { | ||
| queuedActions = delayedOperations; | ||
| delayedOperations = null; | ||
| delayed = false; |
There was a problem hiding this comment.
assert again that it was true?
| } | ||
| } | ||
|
|
||
| private <E extends Exception> void doBlockOperations( |
There was a problem hiding this comment.
can we doc / make it clearer in the name that this methods releases the delayed operations?
There was a problem hiding this comment.
better yet, how about the following? This way it's clear how a delayOperations call is matched by the undelay logic.
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
index 7fc56a1..f1c8b0d 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
@@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
import org.elasticsearch.threadpool.ThreadPool;
@@ -95,7 +96,11 @@ final class IndexShardOperationPermits implements Closeable {
throw new IndexShardClosedException(shardId);
}
delayOperations();
- doBlockOperations(timeout, timeUnit, onBlocked);
+ try {
+ doBlockOperations(timeout, timeUnit, onBlocked);
+ } finally {
+ releasedDelayedOperations();
+ }
}
/**
@@ -110,12 +115,21 @@ final class IndexShardOperationPermits implements Closeable {
*/
<E extends Exception> void asyncBlockOperations(final long timeout, final TimeUnit timeUnit, final CheckedRunnable<E> onBlocked) {
delayOperations();
- threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
- try {
- doBlockOperations(timeout, timeUnit, onBlocked);
- } catch (final Exception e) {
+ threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
+ @Override
+ public void onFailure(Exception e) {
throw new RuntimeException(e);
}
+
+ @Override
+ protected void doRun() throws Exception {
+ doBlockOperations(timeout, timeUnit, onBlocked);
+ }
+
+ @Override
+ public void onAfter() {
+ releasedDelayedOperations();
+ }
});
}
@@ -150,25 +164,30 @@ final class IndexShardOperationPermits implements Closeable {
throw new TimeoutException("timed out during blockOperations");
}
} finally {
- final List<ActionListener<Releasable>> queuedActions;
- synchronized (this) {
- queuedActions = delayedOperations;
- delayedOperations = null;
- delayed = false;
- }
- if (queuedActions != null) {
- // Try acquiring permits on fresh thread (for two reasons):
- // - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled.
- // Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
- // ThreadedActionListener if the queue of the thread pool on which it submits is full.
- // - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
- // handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery.
- threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
- for (ActionListener<Releasable> queuedAction : queuedActions) {
- acquire(queuedAction, null, false);
- }
- });
- }
+ releasedDelayedOperations();
+ }
+ }
+
+ private void releasedDelayedOperations() {
+ final List<ActionListener<Releasable>> queuedActions;
+ synchronized (this) {
+ assert delayed;
+ queuedActions = delayedOperations;
+ delayedOperations = null;
+ delayed = false;
+ }
+ if (queuedActions != null) {
+ // Try acquiring permits on fresh thread (for two reasons):
+ // - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled.
+ // Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
+ // ThreadedActionListener if the queue of the thread pool on which it submits is full.
+ // - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
+ // handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery.
+ threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
+ for (ActionListener<Releasable> queuedAction : queuedActions) {
+ acquire(queuedAction, null, false);
+ }
+ });
}
}
| try { | ||
| doBlockOperations(timeout, timeUnit, onBlocked); | ||
| } catch (final Exception e) { | ||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
I wonder if we should fail the shard if something goes wrong here... maybe we should make the CheckedRunnable it's own AbstractRunnable so we can call onFailure and let the caller decide what to do?
| if (delayed) { | ||
| throw new IllegalStateException("operations are already delayed"); | ||
| } else { | ||
| delayed = true; |
There was a problem hiding this comment.
this can pre-create the delayedOperations list. Stronger yet, maybe we should use the existence of the list as a marker that ops should be delayed?
There was a problem hiding this comment.
I strongly prefer the explicit boolean flag rather than implicit dependency on the list being null versus not null.
There was a problem hiding this comment.
Fair enough. We can also maybe just have a something we can drain and let it be a final field.
| releasable = tryAcquire(); | ||
| if (releasable == null) { | ||
| // blockOperations is executing, this operation will be retried by blockOperations once it finishes | ||
| assert delayed; |
There was a problem hiding this comment.
can we used delay to control the flow here? I think it be easier to read that to make tryAcquire check on delay. WDYT?
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
index 7fc56a1..ebab08d 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java
@@ -190,10 +190,7 @@ final class IndexShardOperationPermits implements Closeable {
final Releasable releasable;
try {
synchronized (this) {
- releasable = tryAcquire();
- if (releasable == null) {
- assert delayed;
- // operations are delayed, this operation will be retried by doBlockOperations once the delay is remoked
+ if (delayed) {
if (delayedOperations == null) {
delayedOperations = new ArrayList<>();
}
@@ -205,6 +202,13 @@ final class IndexShardOperationPermits implements Closeable {
} else {
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
}
+ releasable = null;
+ } else {
+ releasable = tryAcquire();
+ assert releasable != null;
+ }
+ if (releasable == null) {
+ assert delayed;
return;
}
}
@@ -212,7 +216,10 @@ final class IndexShardOperationPermits implements Closeable {
onAcquired.onFailure(e);
return;
}
- onAcquired.onResponse(releasable);
+ if (releasable != null) {
+ // if it's null operations are delayed, this operation will be retried by doBlockOperations once the delay is remoked
+ onAcquired.onResponse(releasable);
+ }
}
@Nullable private Releasable tryAcquire() throws InterruptedException {
There was a problem hiding this comment.
I think we can take it a little more, see my push.
|
Back to you @bleskes. |
* master: Add a second refresh to concurrent relocation test Add a dummy_index to upgrade tests to ensure we recover fine with replicas (elastic#24937) Rework bwc snapshot projects to build up to two bwc versions (elastic#24870) Move the IndexDeletionPolicy to be engine internal (elastic#24930) [Tests] Harden InternalExtendedStatsTests (elastic#24934) TCorrecting api name (elastic#24924) Add search method to high level REST client (elastic#24796) Add fromXContent method to ClearScrollResponse (elastic#24909) ClearScrollRequest to implement ToXContentObject (elastic#24907) SearchScrollRequest to implement ToXContentObject (elastic#24906) Fix bug in weight computation for query cache
* master: Fix typo in comment in ReplicationOperation.java Prevent Index & Delete request primaryTerm getter/setter, setShardId setter Drop name from TokenizerFactory (elastic#24869) Correctly set doc_count when MovAvg "predicts" values on existing buckets (elastic#24892) Handle primary failure handling replica response Add missing word to terms-query.asciidoc (elastic#24960) Correct some spelling in match-phrase-prefix docs (elastic#24956) testConcurrentWriteViewsAndSnapshot shouldn't flush concurrently [TEST] Fix FieldSortIT failures Add doc_count to ParsedMatrixStats (elastic#24952) Add document count to Matrix Stats aggregation response (elastic#24776) Fix script field sort returning Double.MAX_VALUE for all documents (elastic#24942)
bleskes
left a comment
There was a problem hiding this comment.
LGTM. Left some minor suggestions.
| } | ||
| doBlockOperations(timeout, timeUnit, onBlocked); | ||
| } finally { | ||
| releasedDelayedOperations(); |
There was a problem hiding this comment.
nit - an extra d? releasedDelayed..
| final TimeUnit timeUnit, | ||
| final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E { | ||
| if (Assertions.ENABLED) { | ||
| synchronized (this) { |
There was a problem hiding this comment.
can you add a comment this is for visibility?
| } | ||
| return; | ||
| } else { | ||
| releasable = tryAcquire(); |
There was a problem hiding this comment.
should we just make this be called acquire and make it never return null (but blow up if it can't acquire (as it is not non-delayed)?
There was a problem hiding this comment.
Okay, I will do that in a follow-up immediately after this PR.
This commit introduces a clean transition from the old primary term to the new primary term when a replica is promoted primary. To accomplish this, we delay all operations before incrementing the primary term. The delay is guaranteed to be in place before we increment the term, and then all operations that are delayed are executed after the delay is removed which asynchronously happens on another thread. This thread does not progress until in-flight operations that were executing are completed, and after these operations drain, the delayed operations re-acquire permits and are executed.
Relates #10708