Block older operations on primary term transition#24779
Block older operations on primary term transition#24779jasontedor merged 8 commits intoelastic:masterfrom
Conversation
5110598 to
bf5ab75
Compare
Today a replica learns of a new primary term via a cluster state update and there is not a clean transition between the older primary term and the newer primary term. This commit modifies this situation so that: - a replica shard learns of a new primary term via replication operations executed under the mandate of the new primary - when a replica shard learns of a new primary term, it blocks operations on older terms from reaching the engine, with a clear transition point between the operations on the older term and the operations on the newer term This work paves the way for a primary/replica sync on primary promotion. Future work will also ensure a clean transition point on a promoted primary, and prepare a replica shard for a sync with the promoted primary.
64bc8c3 to
857fcdb
Compare
| throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", | ||
| shardId, opPrimaryTerm, primaryTerm)); | ||
| if (operationPrimaryTerm > primaryTerm | ||
| && pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) { |
There was a problem hiding this comment.
If there are many incoming operations with higher term, each one of them will go into this branch and invoke blockOperations (until one completes). This can create additional contention when the first blockOperations is completed and subsequent operations unnecessarily call blockOperations. I've adapted your code in 7ff4a7c so that only the first operation with higher term calls blockOperations.
There was a problem hiding this comment.
This solution has the problem that if, primaryTerm == 0, an operation comes in with operationPrimaryTerm == 1 then another operation comes in with operationPrimaryTerm == 2 and then another ops comes in with operationPrimaryTerm == 1, it maybe be that the last op is processed before the primaryTerm was incremented to 1 (or 2). This can happen if the first ops passed the check but didn't submit it's block. The 2 op incremented pendingPrimaryTerm but didn't submit the block and then the 3rd op just passes this along without waiting.
There was a problem hiding this comment.
and now I see that we guard against it with if (operationPrimaryTerm == currentPrimaryTerm) later on, so the third operation will be failed but with the wrong message (we will say it's too old and give a currentPrimaryTerm of 0 while the ops term is 1). I think is all just too complex and isn't worth it given how rare primary promotions are.
bleskes
left a comment
There was a problem hiding this comment.
Looks great. I think the main discussion point is the concurrency control. I will reach out to discuss in another channel.
| @@ -180,7 +178,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima | |||
|
|
|||
| /** | |||
| * Synchronous replica operation on nodes with replica copies. This is done under the lock form | |||
There was a problem hiding this comment.
nit: this is done while having (under?) a permit
| && pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) { | ||
| try { | ||
| indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { | ||
| if (operationPrimaryTerm > primaryTerm) { |
There was a problem hiding this comment.
can you add a comment as to how it's possible that the term will not be higher (i.e. race condition between checking pendingPrimaryTerm and submitting the blockOperations
| throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", | ||
| shardId, opPrimaryTerm, primaryTerm)); | ||
| if (operationPrimaryTerm > primaryTerm | ||
| && pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) { |
There was a problem hiding this comment.
This solution has the problem that if, primaryTerm == 0, an operation comes in with operationPrimaryTerm == 1 then another operation comes in with operationPrimaryTerm == 2 and then another ops comes in with operationPrimaryTerm == 1, it maybe be that the last op is processed before the primaryTerm was incremented to 1 (or 2). This can happen if the first ops passed the check but didn't submit it's block. The 2 op incremented pendingPrimaryTerm but didn't submit the block and then the 3rd op just passes this along without waiting.
| throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", | ||
| shardId, opPrimaryTerm, primaryTerm)); | ||
| if (operationPrimaryTerm > primaryTerm | ||
| && pendingPrimaryTerm.accumulateAndGet(operationPrimaryTerm, Math::max) == operationPrimaryTerm) { |
There was a problem hiding this comment.
and now I see that we guard against it with if (operationPrimaryTerm == currentPrimaryTerm) later on, so the third operation will be failed but with the wrong message (we will say it's too old and give a currentPrimaryTerm of 0 while the ops term is 1). I think is all just too complex and isn't worth it given how rare primary promotions are.
| public void onResponse(final Releasable releasable) { | ||
| assert operationPrimaryTerm <= primaryTerm | ||
| : "operation primary term [" + operationPrimaryTerm + "] should be at most [" + primaryTerm + "]"; | ||
| if (operationPrimaryTerm < primaryTerm) { |
There was a problem hiding this comment.
add a comment please on how this can happen...
| } | ||
| try { | ||
| if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { | ||
| assert semaphore.availablePermits() == 0; |
| ensureYellow(); | ||
|
|
||
| // this forces the primary term to propagate to the replicas | ||
| client().index(new IndexRequest("test", "type", "1").source("{ \"f\": \"1\"}", XContentType.JSON)).get(); |
There was a problem hiding this comment.
how do we make sure we change it/only do it sometimes once we can?
| allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.activeShards(), nodes); | ||
| final Set<String> initializingIds = | ||
| allocationIdsForShardsOnNodesThatUnderstandSeqNos(indexShardRoutingTable.getAllInitializingShards(), nodes); | ||
| shard.updatePrimaryTerm(clusterState.metaData().index(shard.shardId().getIndex()).primaryTerm(shard.shardId().id())); |
There was a problem hiding this comment.
I'm wondering if we should move this if clause to before update the routing entry.. @ywelsch this class is your baby, any thoughts?
There was a problem hiding this comment.
Let's discuss this separately and proceed in a follow-up if needed.
| ThreadPool.Names.INDEX); | ||
| }; | ||
|
|
||
| final Thread first = new Thread(function.apply(randomBoolean())); |
There was a problem hiding this comment.
can we lock down the expected end term based on these booleans and assert for that?
| @Override | ||
| public void onResponse(Releasable releasable) { | ||
| counter.incrementAndGet(); | ||
| latch.countDown(); |
There was a problem hiding this comment.
can we add a check on the term here?
bleskes
left a comment
There was a problem hiding this comment.
LGTM. Thanks @jasontedor
With #24779 in place, we can now guaranteed that a single translog generation file will never have a sequence number conflict that needs to be resolved by looking at primary terms. These conflicts can a occur when a replica contains an operation which isn't part of the history of a newly promoted primary. That primary can then assign a different operation to the same slot and replicate it to the replica. PS. Knowing that each generation file is conflict free will simplifying repairing these conflicts when we read from the translog. PPS. This PR also fixes some bugs in the piping of primary terms in the bulk shard action. These bugs are a result of the legacy of IndexRequest/DeleteRequest being a ReplicationRequest. We need to change that as a follow up. Relates to #10708
Today a replica learns of a new primary term via a cluster state update and there is not a clean transition between the older primary term and the newer primary term. This commit modifies this situation so that:
This work paves the way for a primary/replica sync on primary promotion. Future work will also ensure a clean transition point on a promoted primary, and prepare a replica shard for a sync with the promoted primary.
Relates #10708