CCR: Replicate existing ops with old term on follower#34412
CCR: Replicate existing ops with old term on follower#34412dnhatn merged 15 commits intoelastic:masterfrom
Conversation
Since , we might hit deadlock if the FollowTask has more fetchers
than writers. This can happen in the following scenario:
Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has
two fetchers and one writer.
1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0,
num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1
respectively.
2. The second request which fetches seq#1 completes before, and then it
triggers a write request containing only seq#1.
3. The primary of a follower fails after it has replicated seq#1 to
replicas.
4. Since the old primary did not respond, the FollowTask issues another
write request containing seq#1 (resend the previous write request).
5. The new primary has seq#1 already; thus it won't replicate seq#1 to
replicas but will wait for the global checkpoint to advance at least
seq#1.
The problem is that the FollowTask has only one writer and that writer
is waiting for seq#0 which won't be delivered until the writer
completed.
This PR proposes to delay the write requests if there is a gap in the
write-buffer. With this change, if a writer is waiting for seq_no N,
then all the operations below N were delivered or were scheduled to
deliver by other writers.
|
Pinging @elastic/es-distributed |
|
Another approach is to let the following primary wait for the advancement of the global checkpoint only if its local checkpoint is at least the waiting_for_global checkpoint. Otherwise, it will return the unapplied operations to the FollowTask without waiting. In the latter case, the FollowTask puts back the unapplied operations to the buffer, then deliver the head (the current behavior) of the buffer (i.e., operations before the waiting_for_gcp). |
| * Looks up the primary term for a given seq_no in the provided directory reader. The caller must ensure that an operation with the | ||
| * given {@code seqNo} exists the provided {@code directoryReader}; otherwise this method will throw {@link IllegalStateException}. | ||
| */ | ||
| public static long lookupPrimaryTerm(final DirectoryReader directoryReader, final long seqNo) throws IOException { |
There was a problem hiding this comment.
@jimczi Could you please have a look at this Lucene code.
There was a problem hiding this comment.
since this wraps all docs live I am not sure it should life here in this class?
|
@bleskes This is ready. Could you please give it shot? |
| int docId; | ||
| while ((docId = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { | ||
| // make sure to skip the non-root nested documents | ||
| if (primaryTermDV.advanceExact(docId - leaf.docBase) && primaryTermDV.longValue() > 0) { |
There was a problem hiding this comment.
The docIdSetIterator returns the leaf doc id so you can use it directly to advance the primaryTermDV ?
| * Looks up the primary term for a given seq_no in the provided directory reader. The caller must ensure that an operation with the | ||
| * given {@code seqNo} exists the provided {@code directoryReader}; otherwise this method will throw {@link IllegalStateException}. | ||
| */ | ||
| public static long lookupPrimaryTerm(final DirectoryReader directoryReader, final long seqNo) throws IOException { |
There was a problem hiding this comment.
since this wraps all docs live I am not sure it should life here in this class?
| final Query query = LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo); | ||
| final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f); | ||
| // iterate backwards since the existing operation is likely in the most recent segments. | ||
| for (int i = reader.leaves().size() - 1; i >= 0; i--) { |
There was a problem hiding this comment.
is this optimization really relevant here? I wonder if we can't just do an ordinary search and then lookup the leaf reader based on the top hits?
| if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) { | ||
| return OptionalLong.empty(); | ||
| } else { | ||
| final long term = VersionsAndSeqNoResolver.lookupPrimaryTerm(searcher.getDirectoryReader(), seqNo); |
There was a problem hiding this comment.
I think we can just put lookupPrimaryTerm in this class.
|
@s1monw I have addressed your comments. Could you please have another look? |
|
@bleskes should also look at this. |
bleskes
left a comment
There was a problem hiding this comment.
Looks great. I left some comments. The important one is about the testing.
| if (failure.getExistingPrimaryTerm().isPresent()) { | ||
| appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong())); | ||
| } else { | ||
| assert targetOp.seqNo() <= primary.getGlobalCheckpoint() : targetOp.seqNo() + " > " + primary.getGlobalCheckpoint(); |
There was a problem hiding this comment.
I think this should also throw an exception so replication stops and we'll know about it (assertion is fine for testing too).
.../java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java
Show resolved
Hide resolved
| return OptionalLong.of(primaryTermDV.longValue()); | ||
| } | ||
| } | ||
| assert false : "seq_no[" + seqNo + "] does not have primary_term"; |
There was a problem hiding this comment.
can we show how many docs we found? this assumes 0 but it might be 2.
| assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); | ||
| assertThat(result.getFailure(), instanceOf(AlreadyProcessedFollowingEngineException.class)); | ||
| AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); | ||
| assertThat(failure.getExistingPrimaryTerm().getAsLong(), equalTo(operationWithTerms.get(op.seqNo()))); |
There was a problem hiding this comment.
this seems to mean we never deliver ops below the global checkpoint (and flush etc.) can we extend the test to that too?
|
@bleskes I've addressed your comments. Would you please take another look? |
This PR adds an assertion which asserts that the existing operation is equal to the processing operation except for the primary term. Relates elastic#34412
Since #34288, we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario: Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer. 1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively. 2. The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1. 3. The primary of a follower fails after it has replicated seq#1 to replicas. 4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request). 5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1. The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. This PR proposes to replicate existing operations with the old primary term (instead of the current term) on the follower. In particular, when the following primary detects that it has processed an process already, it will look up the term of an existing operation with the same seq_no in the Lucene index, then rewrite that operation with the old term before replicating it to the following replicas. This approach is wait-free but requires soft-deletes on the follower. Relates #34288
Since elastic#34412 and elastic#34474, a follower must have soft-deletes enabled to work correctly. This change requires soft-deletes on the follower. Relates elastic#34412 Relates elastic#34474
Since #34288, we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario: Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer. 1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively. 2. The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1. 3. The primary of a follower fails after it has replicated seq#1 to replicas. 4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request). 5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1. The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. This PR proposes to replicate existing operations with the old primary term (instead of the current term) on the follower. In particular, when the following primary detects that it has processed an process already, it will look up the term of an existing operation with the same seq_no in the Lucene index, then rewrite that operation with the old term before replicating it to the following replicas. This approach is wait-free but requires soft-deletes on the follower. Relates #34288
Since #34288, we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario:
Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer.
The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively.
The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1.
The primary of a follower fails after it has replicated seq#1 to replicas.
Since the old primary did not respond, the FollowTask issues another
write request containing seq#1 (resend the previous write request).
The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1.
The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed.
This PR proposes to delay the write requests if there is a gap in the write-buffer. With this change, if a writer is waiting for seq_no N, then all the operations below N were delivered or were scheduled to deliver by other writers.This PR proposes to replicate existing operations with the old primary term (instead of the current term) on the follower. In particular, when the following primary detects that it has processed an process already, it will look up the term of an existing operation with the same seq_no in the Lucene index, then rewrite that operation with the old term before replicating it to the following replicas. This approach is wait-free but requires soft-deletes on the follower. I will make a follow-up to enforce the soft-deletes on the follower.
Relates #34288