KAFKA-5494: enable idempotence with max.in.flight.requests.per.connection > 1#3743
KAFKA-5494: enable idempotence with max.in.flight.requests.per.connection > 1#3743apurvam wants to merge 40 commits into
Conversation
|
cc @ijuma @hachikuji |
There was a problem hiding this comment.
Yes. I will fix the log4j.properties as we near the end of the review process.
|
Here are my todos for the patch, to help reviewers. |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the patch. Left a few comments. Note that I left out comments related to expiration of batches when their ultimate outcome is unknown since we discussed this in detail offline.
There was a problem hiding this comment.
Instead of saying we can't guarantee idempotence, maybe we can just say that idempotence is only supported for in-flight requests greater than 5. This makes it sound like it's a correctness concern, but it's not really.
There was a problem hiding this comment.
nit: seems we don't use topicPartition.
There was a problem hiding this comment.
nit: might help if the log message metions that this is the base sequence.
There was a problem hiding this comment.
nit: would be nice to have consistent naming (e.g. sequences and lastAckedSequences). Also, a comment which clarifies what purpose each serves (especially for the former).
There was a problem hiding this comment.
Seems we need to update the comment below with this check gone.
There was a problem hiding this comment.
Not really sure this is worth checking here. Shouldn't we have ensured that duplicates weren't added to batchMetadata in the first place.
There was a problem hiding this comment.
Yes. This definitely seems spurious. I was debating not including this check, and your comment confirms that I should remove it.
There was a problem hiding this comment.
You can replace this with duplicate.headOption.
There was a problem hiding this comment.
Hmm.. It's not clear that you're handling the case that the marker has an epoch bump below. The old sequence numbers will not be valid for the new epoch, so we should clear them as the comment below says?
As for the last offset being potentially out of sync, I'm not sure it's a problem, but it makes me a tad uncomfortable. We use this to determine when a producer should be expired (due to retention enforcement), but it may be OK if we expire a producerId when the only record left is the marker. However, we might need to be a little careful when loading the producer state from the log to make sure that producer expiration is determined consistently. Maybe we should at least rename the lastOffset method to lastDataOffset or something like that to emphasize that it does not take control records into account.
There was a problem hiding this comment.
Actually, the case where the control batch has a new epoch is being handled in currentEntry.maybeUpdateEpoch. If the epoch changes we update the epoch in the ProducerIdEntry and clear all the batch metadata, which resets all the sequence numbers producer id.
I am not sure I follow your point about producer id expiration. We expire producer ids by time. However, when we are deleting segments whose retention period is elapsed, we use offsets to determine whether the producer snapshot should go. This could mean that the producer id is removed even though a control message is left. This should be fine, because that message is orphaned, and will be deleted eventually once it violates retention. I don't think this is a correctness issue at all.
There was a problem hiding this comment.
Yes, I said it made me uncomfortable, not that it was necessarily a problem. I would be fine renaming the method to lastDataOffset as suggested above so that it's clear that it is not intended to include control records.
There was a problem hiding this comment.
We can remove the TODO now?
There was a problem hiding this comment.
nit: rename to dumpProducerIdSnapshot.
There was a problem hiding this comment.
Related to my previous question, but I'm wondering how likely is it for us to hit a case where we need the additional entries after recovering the log. We would have to receive the produce requests, successfully write them to the log, and await enough time for a new snapshot to be written (which happens after rolling a segment and upon shutdown). All of that before the producer receives the responses. The cost of having this seems minor, but it's still useful to understand how much extra resilience it actually affords.
|
I created a short writeup about these changes. It goes over the assumptions made, the basic solution, and also some follow up work that needs to be done to handle corner cases: https://docs.google.com/document/d/1EBt5rDfsvpK6mAPOOWjxa9vY0hJ0s9Jx9Wpwciy0aVo/edit?usp=sharing |
1933a45 to
67230dc
Compare
|
Latest updates (copied from my commit message):
Some other changes:
Todo: Add unit tests for out of order responses. |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks for the updates. Mostly finished with the changes on the client side.
There was a problem hiding this comment.
Alternatively, we could set retry to be true in the constructor when isSplitBatch is enabled? Or is there a reason we only want to set this when idempotence is in use?
There was a problem hiding this comment.
Would this suggestion work?
There was a problem hiding this comment.
Discussed offline, but after looking again, it seems we no longer need this at all.
There was a problem hiding this comment.
nit: only two uses of this and one of them already has a check for transactionManager being null. Maybe we could change this to ensureQueueIsOrdered and add a null check for the other case.
There was a problem hiding this comment.
Inadvertent string append?
There was a problem hiding this comment.
It might be more natural to add an insertInOrder function (which executes the logic below modulo the first pollFirst) rather than inserting out of order and fixing up later.
Another thing I was wondering is whether a deque is still the right data structure given that we now require ordering. Would a priority queue give us everything we need?
There was a problem hiding this comment.
I thought about this, and the deque still seems like the right one for performance reasons. We use the priority queue just for in flight batches for starters, so it is smaller. Also, we use it for a variety of purposes (like reducing to 1 in flight when there are retries). And we only need to re order the deque extremely rarely, so it is worth making that operation slightly more inefficient in the interests of keeping the performance for the common case.
Another thing is that I wanted to keep the behavior when idempotence is not enabled just the same, which is another reason I went this route.
There was a problem hiding this comment.
Maybe worth a comment that we depend on the invariant that assigned sequence numbers for batches on the queue are all from the same producerId.
There was a problem hiding this comment.
This is a good point. If the producer id changes, that means the transaction manager will stop tracking inflight batches for the previous producer id. So I updated the code to only try reordering if there are actually in flight batches by the current producer id.
Once we have batches by the current producer id, we know that that all batches by the previous producer id are no longer in play, because that is the only way to get out of the unresolved state and start draining new batches.
There was a problem hiding this comment.
High level comment. There is a lot of per-partition state that we're tracking with separate collections. I wonder if this would be easier to manage with a PartitionState object and a single Map<TopicPartition, PartitionState>?
There was a problem hiding this comment.
nit: maybe we could add a private static extension of PriorityQueue which sets the comparator?
There was a problem hiding this comment.
What should the state of the builder be for this? Maybe we should verify at least that it wasn't aborted?
There was a problem hiding this comment.
nit: this could be private? Also, there's an unneeded space after synchronized.
There was a problem hiding this comment.
nit: maybe a better name is something like addInFlightBatch?
|
One new problem we have to tackle with the new approach to duplicates. When sequnce numbers wrap around, it is possible to detect spurious duplicates. There doesn't seem to be an easy solution for this which is elegant, un less we put some bound on how old a duplicate can be. This also is worth discussing. |
|
retest this please |
|
Started system tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1071/ |
de79b00 to
b072662
Compare
There was a problem hiding this comment.
Mentioned previously, but I think it would be more intuitive to insert in order rather than fixing up the collection after the insertion.
There was a problem hiding this comment.
Would like to hear your thoughts on my previous comment as well:
Another thing I was wondering is whether a deque is still the right data structure given that we now require ordering. Would a priority queue give us everything we need?
|
retest this please |
There was a problem hiding this comment.
Hmm.. This doesn't actually unset the producer state. Maybe reopen would be a better name?
hachikuji
left a comment
There was a problem hiding this comment.
A few more (smaller) comments.
There was a problem hiding this comment.
Can't we return currentLastAckedSequence?
There was a problem hiding this comment.
I chose to return -1 when there is no last ack'd sequence because it makes the logic elsewhere a bit cleaner, for instance is maybeUpdateLastAckdSequence we don't need a null check with the current setup.
There was a problem hiding this comment.
That is fine, I am just questioning why we don't use the variable currentLastAckedSequence instead of calling lastAckedSequence.get(topicPartition) a second time.
There was a problem hiding this comment.
nit: Of the micro-optimization variety, but I think it's a little nicer pattern to do a get followed by a null check rather than two hash lookups.
There was a problem hiding this comment.
nit: annoying that we make two calls to nextBatchBySequence here. Maybe we can use a local variable?
There was a problem hiding this comment.
Why do we not need the null check here, but we do in removeInFlightBatch? Should this be documented at least?
There was a problem hiding this comment.
Good catch. Actually, we do need the null check here. The reason it exists in both places is that when we reset producer the producer id, if there is a queued batch in retry from the previous producer id, we would hit an NPE when trying to drain it after the reset. I have fixed it.
There was a problem hiding this comment.
nit: since the argument is a partition, we could probably drop the partition prefix from the name.
There was a problem hiding this comment.
nit: if you use a local variable for lastSequence, then you can also use it in the log message below.
There was a problem hiding this comment.
Actually, the log message should contain the actual lastAckedSequence since the update is conditional: with out of order responses, it is possible that we might move the sequence backward, so the maybeUpadteLastAckedSequence guards against that.
There was a problem hiding this comment.
Pardon.
In the current code base, I don't see maybeUpadteLastAckedSequence.
What's the method called now ?
There was a problem hiding this comment.
Forgot to make this change?
There was a problem hiding this comment.
typo: drop "the" before "whether"
There was a problem hiding this comment.
nit: explicit returns are frowned upon in scala. Maybe we can drop the return and add an else branch? Same for duplicateOf below.
There was a problem hiding this comment.
Yes, I said it made me uncomfortable, not that it was necessarily a problem. I would be fine renaming the method to lastDataOffset as suggested above so that it's clear that it is not intended to include control records.
|
retest this please |
|
Some tests here are definitely leaking threads. Trying to reproduce by running tests in sequence locally. |
apurvam
left a comment
There was a problem hiding this comment.
@hachikuji I addressed most of your comments.
I also found the cause of the thread leaks: in the new scheme, the in memory batchMetadata only caches data batches. However, if an instance of the ProducerStateManager only got a control record for a producer, we would have a ProducerIdEntry with empty batch metadata. When we go to write a snapshot, the new code would NPE, causing threads to leak in shutdown, and causing other tests to fail.
There was a problem hiding this comment.
Actually, I did try your suggestion, but it didn't quite work: for some reason spurious duplicates were being detected for some reason. So I left the old code as is.
There was a problem hiding this comment.
Good catch. Actually, we do need the null check here. The reason it exists in both places is that when we reset producer the producer id, if there is a queued batch in retry from the previous producer id, we would hit an NPE when trying to drain it after the reset. I have fixed it.
There was a problem hiding this comment.
I chose to return -1 when there is no last ack'd sequence because it makes the logic elsewhere a bit cleaner, for instance is maybeUpdateLastAckdSequence we don't need a null check with the current setup.
There was a problem hiding this comment.
good suggestion. By returning -1 from lastAckedSequence if there isn't a last acked sequence, we can reduce this to sequence - lastAckedSequence(topicPartition) == 1. Much simpler!
The specific test causing the problem was |
There was a problem hiding this comment.
I think we could use isNextSequence here.
There was a problem hiding this comment.
I think we should be asserting on the result of the send futures in these test cases.
There was a problem hiding this comment.
Just added the assertions for test cases in which they were missing.
There was a problem hiding this comment.
Can we assert the offset? This ensures that the requests have not been reordered. Also, the isDone check should probably come first
There was a problem hiding this comment.
Hmm. The lastAckdSequence checks already ensure that the responses are coming in the right order. We also check the sequence of the outgoing batch matches what we expect when the requests are sent, which is really what we want to check in these tests.
There was a problem hiding this comment.
nit: here we use sequence number equality to tell when two batches match. Below in drain we use equals: https://github.com/apache/kafka/pull/3743/files#diff-e67ae6379c50ac1d9c58cefaf71346ceR540. Unless there is a good reason, maybe we should be consistent?
There was a problem hiding this comment.
ack. I prefer to just use sequence number equality. Will update the drain method.
hachikuji
left a comment
There was a problem hiding this comment.
A few more comments. Getting close!
There was a problem hiding this comment.
Seems like there's no longer any reason for this logic to be in sendProducerData. Maybe we may as well move it into run(now) with the rest of the other periodic transaction/idempotent checks?
There was a problem hiding this comment.
Should we assert the failure exception?
There was a problem hiding this comment.
nit: maybe TIMEOUT_ERROR would be better for this case since NOT_LEADER_FOR_PARTITION means the record definitely wasn't written.
By the way, doesn't this expose a weakness in our error checking? We know for sure that the write never succeeded even though we had to expire it.
There was a problem hiding this comment.
Yes. I think this was brought up in the KIP-91 discussion as well: we could optimize for error codes which definitively indicate that the batch wasn't written, but we don't do that today.
This means we could get into situations where we reset the producer id where we don't strictly need to. I filed https://issues.apache.org/jira/browse/KAFKA-5897 to track this.
There was a problem hiding this comment.
I was looking for a test case which exercised the sequence number adjustment logic, but couldn't find one. Specifically a test case which caused the sequence number of a batch to change.
There was a problem hiding this comment.
Good catch. There wasn't a test, and there was a bug in that logic. Particularly, when a batch fails fatally, the future in flight batches are guaranteed to fail with an OutOfOrderSequenceException. We retry these exceptions as long as the batch in question is not the next-in-line batch. However, when we reset sequence numbers, the first future batch does become the next-in-line batch, and it is not retried when it fails with --the expected-- OutOfOrderSequenceException.
I have have the test case and the fix for the bug in an upcoming commit.
Todo: 1) Write more unit tests. 2) Handle deletion / retention / cleaning correctly.
and incremented during drain. If a batch is retried, it's sequence number is unset during the completion handler. If the first inflight batch returns an error, the next sequence to assign is reset to the last ack'd sequence + 1.
…a. This makes accesses from the cleaner threads (log cleaner, producer id expiration, etc.) safe
1. Fixed a potentioal NPE in ProducerStateManager.writeSnapshot where we could have no batch metadata in memory (for instance, if only a control batch was written), and yet write a snapshot. This was causing tests to fail because an NPE was raised during shutdown, leaking threads. 2. Addressed the remaining PR comments.
If a batch has a sequence, but there are no recorded inflight requests to that partition, it means that the producer id has changed, and the batch under consideration was from a previous producerId. It should always be drained (if it returns again, it will fail fatally). The code before this patch would ensure that it would _never_ drain.
…ust the lastSeq per producer to the LogCleaner, hence simpliying the concurrency model
… expired during a transaction
… cases in SenderTest
One notable update: We didn't have a test for reassiging sequence numbers, and we had a bug in that logic. Essentially a newly reassigned batch would be fail with an out of order sequence exception because it would be the next in line batch after its sequence was adjusted. This is fixed by tracking whether the sequence of a batch has been adjusted since the last drain, in which case the out of order sequence exception for it is always retried. If it fails again, it will be fatal.
7bab81c to
2d6b5de
Compare
1. Move the 'reopen' flag from MemoryRecordsBuilder to ProducerBatch 2. Move the logic to resolve sequence number state to the top of the run loop. 3. More canonical scala in Log.scala.
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. I'll merge after the builds complete.
|
|
||
| // If there are no inflight batches being tracked by the transaction manager, it means that the producer | ||
| // id must have changed and the batches being re enqueued are from the old producer id. In this case | ||
| // we don't try to ensure ordering amongst them. They will eventually fail with an OutOfOrderSequence, |
There was a problem hiding this comment.
OutOfOrderSequence -> OutOfOrderSequenceException
| } | ||
| })); | ||
| } | ||
| inflightBatchesBySequence.get(batch.topicPartition).offer(batch); |
There was a problem hiding this comment.
Shouldn't we add a check for the number of batches stored in the PriorityQueue ?
What if more than 5 batches are stored ?
|
I saw that the Sender checks for a It seems to me that duplicate detection relies on checking if the sequence number is more than
|
Here we introduce client and broker changes to support multiple inflight requests while still guaranteeing idempotence. Two major problems to be solved:
OutOfOrderSequenceException. This must be handled on the client with intelligent sequence reassignment. We must also deal with the fatal failure of some batch: the future batches must get different sequence numbers when the come back.I have added
TODO(reviewers)comments for specific decisions in the code which are worth discussing.TODO: