KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated#9700
Conversation
f1f38c4 to
ef4531f
Compare
ef4531f to
4cd4ea7
Compare
ableegoldman
left a comment
There was a problem hiding this comment.
Thanks for catching this. I had a few questions since I'm not that familiar with the EOS code and exception handling I guess. Would be good if @mjsax or @guozhangwang could take a look as well
| } else if (exception instanceof ProducerFencedException || | ||
| exception instanceof InvalidProducerEpochException || | ||
| exception instanceof OutOfOrderSequenceException) { |
There was a problem hiding this comment.
Why is that here, we catch these three exceptions -- ProducerFenced, InvalidProducerEpoch, and OutOfOrderSequence -- and wrap them as TaskMigratedException, while in StreamsProducer#send, we catch ProducerFenced, InvalidProducerEpoch, and UnknownProducerId exceptions and wrap those as TaskMigrated?
There was a problem hiding this comment.
Just to clarify, I think my question/confusion is twofold:
- Why is it OutOfOrderSequence in one place and UnknownProducerId in another?
- What is the difference between these two code paths? Is it really possible for example for the ProducerFencedException to sometimes be thrown directly from
Producer#send, and sometimes be passed along through the callback ( as instreamsProducer.send(serializedRecord, (metadata, exception))
There was a problem hiding this comment.
Wasn't over thinking here since this is a blocker-fix, so we don't want to trigger any regression here by changing existing exception catching logic. Would be good to do this as a follow-up I guess.
There was a problem hiding this comment.
Ok SG, agree we should keep things simple since this is a last minute blocker
There was a problem hiding this comment.
I thought about this a bit, and I think both UnknownProducerId and OutOfOrderSequence could be possibly thrown from the caller or directly (though in the later case they would be wrapped as KafkaException).
I created two tickets, one for producer and one for streams to improve the general picture moving forward.
https://issues.apache.org/jira/browse/KAFKA-10829
https://issues.apache.org/jira/browse/KAFKA-10830
ableegoldman
left a comment
There was a problem hiding this comment.
LGTM, ok to look into the question about the different exceptions as followup work, or as part of your work to clean up exception handling in the Producer. Thanks for the fix!
| } else if (exception instanceof ProducerFencedException || | ||
| exception instanceof InvalidProducerEpochException || | ||
| exception instanceof OutOfOrderSequenceException) { |
There was a problem hiding this comment.
I thought about this a bit, and I think both UnknownProducerId and OutOfOrderSequence could be possibly thrown from the caller or directly (though in the later case they would be wrapped as KafkaException).
I created two tickets, one for producer and one for streams to improve the general picture moving forward.
https://issues.apache.org/jira/browse/KAFKA-10829
https://issues.apache.org/jira/browse/KAFKA-10830
As suggested, ensure InvalidProducerEpoch gets caught properly on stream side. Reviewers: Guozhang Wang <wangguoz@gmail.com>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
As suggested, ensure InvalidProducerEpoch gets caught properly on stream side. Reviewers: Guozhang Wang <wangguoz@gmail.com>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
|
Cherry-picked to 2.7 |
We should catch InvalidProducerEpoch and rethrow as TaskMigrated, similar to ProducerFenced.
Committer Checklist (excluded from commit message)