Skip to content

KAFKA-5494: enable idempotence with max.in.flight.requests.per.connection > 1#3743

Closed
apurvam wants to merge 40 commits into
apache:trunkfrom
apurvam:KAFKA-5494-increase-max-in-flight-for-idempotent-producer
Closed

KAFKA-5494: enable idempotence with max.in.flight.requests.per.connection > 1#3743
apurvam wants to merge 40 commits into
apache:trunkfrom
apurvam:KAFKA-5494-increase-max-in-flight-for-idempotent-producer

Conversation

@apurvam

@apurvam apurvam commented Aug 25, 2017

Copy link
Copy Markdown
Contributor

Here we introduce client and broker changes to support multiple inflight requests while still guaranteeing idempotence. Two major problems to be solved:

  1. Sequence number management on the client when there are request failures. When a batch fails, future inflight batches will also fail with OutOfOrderSequenceException. This must be handled on the client with intelligent sequence reassignment. We must also deal with the fatal failure of some batch: the future batches must get different sequence numbers when the come back.
  2. On the broker, when we have multiple inflights, we can get duplicates of multiple old batches. With this patch, we retain the record metadata for 5 older batches.

I have added TODO(reviewers) comments for specific decisions in the code which are worth discussing.

TODO:

  1. Add more unit tests, especially for loading different snapshot versions correctly, more client side unit tests, more broker side tests to validate that we are caching the correct number of batches (some of this is already there).
  2. Update the system tests to check for ordering.
  3. Run a tight loop of system tests.
  4. Add comments about the assumptions made around the network client semantics of send/receive.
  5. Gracefully handle changes in the producer id. In particular, we should never change the producer id once a batch has been sent, only the sequence number. This protects against duplicates and is a generalization of the previous approach to not reset producer state on retry.

@apurvam

apurvam commented Aug 25, 2017

Copy link
Copy Markdown
Contributor Author

cc @ijuma @hachikuji

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably unintentional.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I will fix the log4j.properties as we near the end of the review process.

@apurvam

apurvam commented Aug 29, 2017

Copy link
Copy Markdown
Contributor Author

Here are my todos for the patch, to help reviewers.

1. Snapshot load unit tests.
2. ProducerState tests to ensure that the correct number of batches are retained when truncation happens at head. 
3. Log tests to ensure that we handle duplicates of batches older than the last appended batch.
4. Review client tests. Do we need to add anything around batch expiry? We probably should add assertions around resetting the batch metadata on retry if we don’t do it already.
5. How does KIP-91 discussion impact this? We probably should retain the producer id and only set a new sequence if the producer id hasn’t changed. This is a generalization of the previous `isRetry` check, where we don’t change the producer id of a batch once assigned.
6. Need to upgrade the system test to enforce ordering.

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. Left a few comments. Note that I left out comments related to expiration of batches when their ultimate outcome is unknown since we discussed this in detail offline.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of saying we can't guarantee idempotence, maybe we can just say that idempotence is only supported for in-flight requests greater than 5. This makes it sound like it's a correctness concern, but it's not really.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems we don't use topicPartition.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might help if the log message metions that this is the base sequence.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be nice to have consistent naming (e.g. sequences and lastAckedSequences). Also, a comment which clarifies what purpose each serves (especially for the former).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we need to update the comment below with this check gone.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really sure this is worth checking here. Shouldn't we have ensured that duplicates weren't added to batchMetadata in the first place.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This definitely seems spurious. I was debating not including this check, and your comment confirms that I should remove it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can replace this with duplicate.headOption.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. It's not clear that you're handling the case that the marker has an epoch bump below. The old sequence numbers will not be valid for the new epoch, so we should clear them as the comment below says?

As for the last offset being potentially out of sync, I'm not sure it's a problem, but it makes me a tad uncomfortable. We use this to determine when a producer should be expired (due to retention enforcement), but it may be OK if we expire a producerId when the only record left is the marker. However, we might need to be a little careful when loading the producer state from the log to make sure that producer expiration is determined consistently. Maybe we should at least rename the lastOffset method to lastDataOffset or something like that to emphasize that it does not take control records into account.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the case where the control batch has a new epoch is being handled in currentEntry.maybeUpdateEpoch. If the epoch changes we update the epoch in the ProducerIdEntry and clear all the batch metadata, which resets all the sequence numbers producer id.

I am not sure I follow your point about producer id expiration. We expire producer ids by time. However, when we are deleting segments whose retention period is elapsed, we use offsets to determine whether the producer snapshot should go. This could mean that the producer id is removed even though a control message is left. This should be fine, because that message is orphaned, and will be deleted eventually once it violates retention. I don't think this is a correctness issue at all.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I said it made me uncomfortable, not that it was necessarily a problem. I would be fine renaming the method to lastDataOffset as suggested above so that it's clear that it is not intended to include control records.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the TODO now?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to dumpProducerIdSnapshot.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my previous question, but I'm wondering how likely is it for us to hit a case where we need the additional entries after recovering the log. We would have to receive the produce requests, successfully write them to the log, and await enough time for a new snapshot to be written (which happens after rolling a segment and upon shutdown). All of that before the producer receives the responses. The cost of having this seems minor, but it's still useful to understand how much extra resilience it actually affords.

@apurvam

apurvam commented Aug 30, 2017

Copy link
Copy Markdown
Contributor Author

I created a short writeup about these changes. It goes over the assumptions made, the basic solution, and also some follow up work that needs to be done to handle corner cases:

https://docs.google.com/document/d/1EBt5rDfsvpK6mAPOOWjxa9vY0hJ0s9Jx9Wpwciy0aVo/edit?usp=sharing

@apurvam apurvam force-pushed the KAFKA-5494-increase-max-in-flight-for-idempotent-producer branch from 1933a45 to 67230dc Compare September 6, 2017 21:59
@apurvam

apurvam commented Sep 6, 2017

Copy link
Copy Markdown
Contributor Author

Latest updates (copied from my commit message):

  1. Batch expiry on the producer. In this case, we need mute the
    partition and make sure all the inflight requests are fully resolved. If
    they are successful, then all is well. If they are also expired or fail
    due to OutOfOrderSequence, we need to reset the producer id.

  2. We now deal correctly with out of order responses to in flight
    requests which may happen due to leadership changes. We keep track of
    all inflight batches, ordered by sequence. Whenever there is an error,
    we reduce to one inflight request by ensuring only the next batch
    by sequence can be drained. When we requeue a batch, we check to make
    sure that the queue remains in sequence order. When a batch completes,
    it is removed from the in flight list, and the next batch by sequence
    changes.

Some other changes:

  1. We never change sequence numbers unless we are certain that a batch
    failed fatally on the broker (got a fatal error code), and future
    batches retured OutOfOrderSequence. Before the latest commit, we would
    always reset sequence numbers on retry, and have other mechanisms to
    ensure that a given batch got the same sequence as before on the next
    drain. This is now much tighter, and needs to be in order to correctly
    handle the out of order response problem.

Todo:

Add unit tests for out of order responses.
Add unit tests new cases around batch expiry (all batches expired, all
batches succeeded, future batches failed with OutOfSequence).

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Mostly finished with the changes on the client side.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could set retry to be true in the constructor when isSplitBatch is enabled? Or is there a reason we only want to set this when idempotence is in use?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this suggestion work?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, but after looking again, it seems we no longer need this at all.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: only two uses of this and one of them already has a check for transactionManager being null. Maybe we could change this to ensureQueueIsOrdered and add a null check for the other case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inadvertent string append?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be more natural to add an insertInOrder function (which executes the logic below modulo the first pollFirst) rather than inserting out of order and fixing up later.

Another thing I was wondering is whether a deque is still the right data structure given that we now require ordering. Would a priority queue give us everything we need?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this, and the deque still seems like the right one for performance reasons. We use the priority queue just for in flight batches for starters, so it is smaller. Also, we use it for a variety of purposes (like reducing to 1 in flight when there are retries). And we only need to re order the deque extremely rarely, so it is worth making that operation slightly more inefficient in the interests of keeping the performance for the common case.

Another thing is that I wanted to keep the behavior when idempotence is not enabled just the same, which is another reason I went this route.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe worth a comment that we depend on the invariant that assigned sequence numbers for batches on the queue are all from the same producerId.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. If the producer id changes, that means the transaction manager will stop tracking inflight batches for the previous producer id. So I updated the code to only try reordering if there are actually in flight batches by the current producer id.

Once we have batches by the current producer id, we know that that all batches by the previous producer id are no longer in play, because that is the only way to get out of the unresolved state and start draining new batches.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level comment. There is a lot of per-partition state that we're tracking with separate collections. I wonder if this would be easier to manage with a PartitionState object and a single Map<TopicPartition, PartitionState>?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we could add a private static extension of PriorityQueue which sets the comparator?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should the state of the builder be for this? Maybe we should verify at least that it wasn't aborted?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this could be private? Also, there's an unneeded space after synchronized.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe a better name is something like addInFlightBatch?

@apurvam

apurvam commented Sep 9, 2017

Copy link
Copy Markdown
Contributor Author

One new problem we have to tackle with the new approach to duplicates. When sequnce numbers wrap around, it is possible to detect spurious duplicates. There doesn't seem to be an easy solution for this which is elegant, un less we put some bound on how old a duplicate can be. This also is worth discussing.

@apurvam

apurvam commented Sep 11, 2017

Copy link
Copy Markdown
Contributor Author

retest this please

@apurvam

apurvam commented Sep 11, 2017

Copy link
Copy Markdown
Contributor Author

@apurvam apurvam force-pushed the KAFKA-5494-increase-max-in-flight-for-idempotent-producer branch from de79b00 to b072662 Compare September 11, 2017 21:13

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned previously, but I think it would be more intuitive to insert in order rather than fixing up the collection after the insertion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to hear your thoughts on my previous comment as well:

Another thing I was wondering is whether a deque is still the right data structure given that we now require ordering. Would a priority queue give us everything we need?

@hachikuji

Copy link
Copy Markdown
Contributor

retest this please

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. This doesn't actually unset the producer state. Maybe reopen would be a better name?

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more (smaller) comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we return currentLastAckedSequence?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to return -1 when there is no last ack'd sequence because it makes the logic elsewhere a bit cleaner, for instance is maybeUpdateLastAckdSequence we don't need a null check with the current setup.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine, I am just questioning why we don't use the variable currentLastAckedSequence instead of calling lastAckedSequence.get(topicPartition) a second time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. Will update.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Of the micro-optimization variety, but I think it's a little nicer pattern to do a get followed by a null check rather than two hash lookups.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: annoying that we make two calls to nextBatchBySequence here. Maybe we can use a local variable?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not need the null check here, but we do in removeInFlightBatch? Should this be documented at least?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Actually, we do need the null check here. The reason it exists in both places is that when we reset producer the producer id, if there is a queued batch in retry from the previous producer id, we would hit an NPE when trying to drain it after the reset. I have fixed it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since the argument is a partition, we could probably drop the partition prefix from the name.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you use a local variable for lastSequence, then you can also use it in the log message below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the log message should contain the actual lastAckedSequence since the update is conditional: with out of order responses, it is possible that we might move the sequence backward, so the maybeUpadteLastAckedSequence guards against that.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pardon.

In the current code base, I don't see maybeUpadteLastAckedSequence.
What's the method called now ?

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to make this change?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: drop "the" before "whether"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: explicit returns are frowned upon in scala. Maybe we can drop the return and add an else branch? Same for duplicateOf below.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I said it made me uncomfortable, not that it was necessarily a problem. I would be fine renaming the method to lastDataOffset as suggested above so that it's clear that it is not intended to include control records.

@apurvam

apurvam commented Sep 13, 2017

Copy link
Copy Markdown
Contributor Author

retest this please

@apurvam

apurvam commented Sep 13, 2017

Copy link
Copy Markdown
Contributor Author

Some tests here are definitely leaking threads. Trying to reproduce by running tests in sequence locally.

@apurvam apurvam left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji I addressed most of your comments.

I also found the cause of the thread leaks: in the new scheme, the in memory batchMetadata only caches data batches. However, if an instance of the ProducerStateManager only got a control record for a producer, we would have a ProducerIdEntry with empty batch metadata. When we go to write a snapshot, the new code would NPE, causing threads to leak in shutdown, and causing other tests to fail.

Comment thread core/src/main/scala/kafka/log/Log.scala Outdated

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I did try your suggestion, but it didn't quite work: for some reason spurious duplicates were being detected for some reason. So I left the old code as is.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Actually, we do need the null check here. The reason it exists in both places is that when we reset producer the producer id, if there is a queued batch in retry from the previous producer id, we would hit an NPE when trying to drain it after the reset. I have fixed it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to return -1 when there is no last ack'd sequence because it makes the logic elsewhere a bit cleaner, for instance is maybeUpdateLastAckdSequence we don't need a null check with the current setup.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion. By returning -1 from lastAckedSequence if there isn't a last acked sequence, we can reduce this to sequence - lastAckedSequence(topicPartition) == 1. Much simpler!

@apurvam

apurvam commented Sep 13, 2017

Copy link
Copy Markdown
Contributor Author

However, if an instance of the ProducerStateManager only got a control record for a producer, we would have a ProducerIdEntry with empty batch metadata. When we go to write a snapshot, the new code would NPE, causing threads to leak in shutdown, and causing other tests to fail.

The specific test causing the problem was TransactionsTest.testFencingOnSend. In the current patch, we drop the batch metadata when be bump the epoch because the previous sequence numbers no longer apply and shouldn't be checked. In this test, we write a snapshot immediately after after the fence, which triggers the NPE.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use isNextSequence here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be asserting on the result of the send futures in these test cases.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added the assertions for test cases in which they were missing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert the offset? This ensures that the requests have not been reordered. Also, the isDone check should probably come first

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. The lastAckdSequence checks already ensure that the responses are coming in the right order. We also check the sequence of the outgoing batch matches what we expect when the requests are sent, which is really what we want to check in these tests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here we use sequence number equality to tell when two batches match. Below in drain we use equals: https://github.com/apache/kafka/pull/3743/files#diff-e67ae6379c50ac1d9c58cefaf71346ceR540. Unless there is a good reason, maybe we should be consistent?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. I prefer to just use sequence number equality. Will update the drain method.

@apurvam

apurvam commented Sep 14, 2017

Copy link
Copy Markdown
Contributor Author

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments. Getting close!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there's no longer any reason for this logic to be in sendProducerData. Maybe we may as well move it into run(now) with the rest of the other periodic transaction/idempotent checks?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert the failure exception?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe TIMEOUT_ERROR would be better for this case since NOT_LEADER_FOR_PARTITION means the record definitely wasn't written.

By the way, doesn't this expose a weakness in our error checking? We know for sure that the write never succeeded even though we had to expire it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think this was brought up in the KIP-91 discussion as well: we could optimize for error codes which definitively indicate that the batch wasn't written, but we don't do that today.

This means we could get into situations where we reset the producer id where we don't strictly need to. I filed https://issues.apache.org/jira/browse/KAFKA-5897 to track this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking for a test case which exercised the sequence number adjustment logic, but couldn't find one. Specifically a test case which caused the sequence number of a batch to change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. There wasn't a test, and there was a bug in that logic. Particularly, when a batch fails fatally, the future in flight batches are guaranteed to fail with an OutOfOrderSequenceException. We retry these exceptions as long as the batch in question is not the next-in-line batch. However, when we reset sequence numbers, the first future batch does become the next-in-line batch, and it is not retried when it fails with --the expected-- OutOfOrderSequenceException.

I have have the test case and the fix for the bug in an upcoming commit.

Apurva Mehta added 4 commits September 14, 2017 12:48
Todo:
  1) Write more unit tests.
  2) Handle deletion / retention / cleaning correctly.
and incremented during drain. If a batch is retried, it's sequence
number is unset during the completion handler. If the first inflight
batch returns an error, the next sequence to assign is reset to the last
ack'd sequence + 1.
Apurva Mehta added 17 commits September 14, 2017 12:56
…a. This makes accesses from the cleaner threads (log cleaner, producer id expiration, etc.) safe
  1. Fixed a potentioal NPE in ProducerStateManager.writeSnapshot where
we could have no batch metadata in memory (for instance, if only a
control batch was written), and yet write a snapshot. This was causing
tests to fail because an NPE was raised during shutdown, leaking
threads.
  2. Addressed the remaining PR comments.
If a batch has a sequence, but there are no recorded inflight requests
to that partition, it means that the producer id has changed, and the
batch under consideration was from a previous producerId. It should
always be drained (if it returns again, it will fail fatally).

The code before this patch would ensure that it would _never_ drain.
…ust the lastSeq per producer to the LogCleaner, hence simpliying the concurrency model
One notable update: We didn't have a test for reassiging sequence
numbers, and we had a bug in that logic. Essentially a newly reassigned
batch would be fail with an out of order sequence exception because it
would be the next in line batch after its sequence was adjusted.

This is fixed by tracking whether the sequence of a batch has been
adjusted since the last drain, in which case the out of order sequence
exception for it is always retried. If it fails again, it will be fatal.
@apurvam apurvam force-pushed the KAFKA-5494-increase-max-in-flight-for-idempotent-producer branch from 7bab81c to 2d6b5de Compare September 14, 2017 20:00
Apurva Mehta added 2 commits September 14, 2017 13:52
  1. Move the 'reopen' flag from MemoryRecordsBuilder to ProducerBatch
  2. Move the logic to resolve sequence number state to the top of the
run loop.
  3. More canonical scala in Log.scala.

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll merge after the builds complete.


// If there are no inflight batches being tracked by the transaction manager, it means that the producer
// id must have changed and the batches being re enqueued are from the old producer id. In this case
// we don't try to ensure ordering amongst them. They will eventually fail with an OutOfOrderSequence,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OutOfOrderSequence -> OutOfOrderSequenceException

}
}));
}
inflightBatchesBySequence.get(batch.topicPartition).offer(batch);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we add a check for the number of batches stored in the PriorityQueue ?
What if more than 5 batches are stored ?

@luigiberrettini

Copy link
Copy Markdown

I saw that the Sender checks for a Errors.DUPLICATE_SEQUENCE_NUMBER but I was not able to find where this error is triggered on the server side.

It seems to me that duplicate detection relies on checking if the sequence number is more than lastPersistedSeq + 1.
If this is the case:

  • why storing the metadata for the last batches and not just relying on the sequence number of the last message persisted in the log?
  • why limiting max.in.flight.requests.per.connection to a maximun value of 5 if duplicates are still detected when metadata is not found (and therefore with any number of max in flights)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants