CCR: Following primary should process operations once#34288
CCR: Following primary should process operations once#34288dnhatn merged 13 commits intoelastic:masterfrom
Conversation
Today we rewrite the operations from the leader with the term of the following primary because the follower should own its history. The problem is that a newly promoted primary may re-assign its term to operations which were replicated to replicas before by the previous primary. If this happens, some operations with the same seq_no may be assigned different terms. This is not good for the future optimistic locking using a combination of seqno and term. This change ensures that the primary of a follower only processes an operation if that operation was not processed before.
|
Pinging @elastic/es-distributed |
bleskes
left a comment
There was a problem hiding this comment.
Looking awesome. I left some nits and suggestions.
| } | ||
| } | ||
| } | ||
| assert appliedOperations.size() == sourceOperations.size() || waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO; |
| for (final Translog.Operation operation : request.getOperations()) { | ||
| final Engine.Result result = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA); | ||
| if (result.getResultType() != Engine.Result.Type.SUCCESS) { | ||
| assert false : "failure should never happens on replicas; op=[" + operation + "] error=" + result.getFailure() + "]"; |
There was a problem hiding this comment.
doc level failure (normal failures are OK from an algorithmic perspective).
| listener.onFailure(e); | ||
| } else { | ||
| assert waitingForGlobalCheckpoint <= gcp : waitingForGlobalCheckpoint + " > " + gcp; | ||
| fillResponse.run(); |
There was a problem hiding this comment.
fillResponse can throw an already closed exception. We should make sure we deal with exceptions here correctly
There was a problem hiding this comment.
Maybe warp the listener using ActionListener#wrap which does the write things and will simplify the code here too.
x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public NoOpResult noOp(NoOp noOp) { | ||
| // TODO: Make sure we process NoOp once. |
There was a problem hiding this comment.
why can't we do this now in this PR in the same way?
There was a problem hiding this comment.
This is because NoOps don't have _id and they are processed without the _id lock. I am not sure if we need to introduce a fake _id (for locking purpose) for Noops. Thus, I prefer to make it in a separate PR so we can see it more clear.
| } | ||
| } | ||
| for (IndexShard replica : follower.getReplicas()) { | ||
| try (Translog.Snapshot rSnapshot = replica.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { |
There was a problem hiding this comment.
same comment - can we check the content of the ops?
...in/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java
Show resolved
Hide resolved
| expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1))); | ||
|
|
||
| shard.updateGlobalCheckpointOnReplica(randomLongBetween(waitingForGlobalCheckpoint, shard.getLocalCheckpoint()), "test"); | ||
| assertThat(listener.actionGet(TimeValue.timeValueSeconds(5)).getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); |
There was a problem hiding this comment.
can we make this just get()? I'm not so comfortable with 5s (it's short) but also we typically let the suite time out so we can get a thread dump (although I suspect it won't be that helpful here, it might)
| long waitingForGlobalCheckpoint = randomLongBetween(-1, shard.getGlobalCheckpoint()); | ||
| CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger); | ||
| primaryResult.respond(listener); | ||
| assertThat(listener.actionGet(TimeValue.timeValueSeconds(5)).getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); |
| for (Engine.Operation op : operations) { | ||
| Engine.Operation.Origin nonPrimary = randomValueOtherThan(Engine.Operation.Origin.PRIMARY, | ||
| () -> randomFrom(Engine.Operation.Origin.values())); | ||
| Engine.Result result = applyOperation(followingEngine, op, nonPrimary); |
There was a problem hiding this comment.
any chance we can also check that this wasn't indexed to lucene? maybe doc counts?
|
@bleskes Thanks so much for allocating some time on this. I have addressed your suggestions. Could you please have another look? |
| final BulkShardOperationsResponse response = finalResponseIfSuccessful; | ||
| final ActionListener<BulkShardOperationsResponse> wrappedListener = ActionListener.wrap(response -> { | ||
| final SeqNoStats seqNoStats = primary.seqNoStats(); | ||
| // return a fresh global checkpoint after the operations have been replicated for the shard follow task |
...rc/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java
Show resolved
Hide resolved
|
Thanks @bleskes. |
Today we rewrite the operations from the leader with the term of the following primary because the follower should own its history. The problem is that a newly promoted primary may re-assign its term to operations which were replicated to replicas before by the previous primary. If this happens, some operations with the same seq_no may be assigned different terms. This is not good for the future optimistic locking using a combination of seqno and term. This change ensures that the primary of a follower only processes an operation if that operation was not processed before. The skipped operations are guaranteed to be delivered to replicas via either primary-replica resync or peer-recovery. However, the primary must not acknowledge until the global checkpoint is at least the highest seqno of all skipped ops (i.e., they all have been processed on every replica). Relates #31751 Relates #31113
|
Sadly, we might hit deadlock if the FollowTask has more fetchers than writers. Suppose the leader has two operations [seq#0, seq#1]; FollowTask has two fetchers with fetch-size=1, and one writer with write-size=1.
The problem is the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. One solution that I see is to delay the write requests if there is a gap between the last write request and the next write request (the fetched operations are sorted by seq_no already). |
|
CI has found one case: https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+periodic/162/console. |
This is a follow-up for elastic#34288 (comment). Relates elastic#34288
Since #34288, we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario: Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer. 1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively. 2. The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1. 3. The primary of a follower fails after it has replicated seq#1 to replicas. 4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request). 5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1. The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. This PR proposes to replicate existing operations with the old primary term (instead of the current term) on the follower. In particular, when the following primary detects that it has processed an process already, it will look up the term of an existing operation with the same seq_no in the Lucene index, then rewrite that operation with the old term before replicating it to the following replicas. This approach is wait-free but requires soft-deletes on the follower. Relates #34288
Since #34288, we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario: Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer. 1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively. 2. The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1. 3. The primary of a follower fails after it has replicated seq#1 to replicas. 4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request). 5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1. The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. This PR proposes to replicate existing operations with the old primary term (instead of the current term) on the follower. In particular, when the following primary detects that it has processed an process already, it will look up the term of an existing operation with the same seq_no in the Lucene index, then rewrite that operation with the old term before replicating it to the following replicas. This approach is wait-free but requires soft-deletes on the follower. Relates #34288
Today we rewrite the operations from the leader with the term of the following primary because the follower should own its history. The problem is that a newly promoted primary may re-assign its term to operations which were replicated to replicas before by the previous primary. If this happens, some operations with the same seq_no may be assigned different terms. This is not good for the future optimistic locking using a combination of seqno and term. This change ensures that the primary of a follower only processes an operation if that operation was not processed before. The skipped operations are guaranteed to be delivered to replicas via either primary-replica resync or peer-recovery. However, the primary must not acknowledge until the global checkpoint is at least the highest seqno of all skipped ops (i.e., they all have been processed on every replica). Relates #31751 Relates #31113
Since #34288, we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario: Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer. 1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively. 2. The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1. 3. The primary of a follower fails after it has replicated seq#1 to replicas. 4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request). 5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1. The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. This PR proposes to replicate existing operations with the old primary term (instead of the current term) on the follower. In particular, when the following primary detects that it has processed an process already, it will look up the term of an existing operation with the same seq_no in the Lucene index, then rewrite that operation with the old term before replicating it to the following replicas. This approach is wait-free but requires soft-deletes on the follower. Relates #34288
Today we rewrite the operations from the leader with the term of the
following primary because the follower should own its history. The
problem is that a newly promoted primary may re-assign its term to
operations which were replicated to replicas before by the previous
primary. If this happens, some operations with the same seq_no may be
assigned different terms. This is not good for the future optimistic
locking using a combination of seqno and term.
This change ensures that the primary of a follower only processes an
operation if that operation was not processed before. The skipped
operations are guaranteed to be delivered to replicas via either
primary-replica resync or peer-recovery. However, the primary must not
acknowledge until the global checkpoint is at least the highest seqno of
all skipped ops (i.e., they all have been processed on every replica).
Relates #31751
Relates #31113