Skip to content

KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated#9700

Merged
abbccdda merged 1 commit into
apache:trunkfrom
abbccdda:stream-producer-fix-KAFKA-10813
Dec 10, 2020
Merged

KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated#9700
abbccdda merged 1 commit into
apache:trunkfrom
abbccdda:stream-producer-fix-KAFKA-10813

Conversation

@abbccdda

@abbccdda abbccdda commented Dec 5, 2020

Copy link
Copy Markdown

We should catch InvalidProducerEpoch and rethrow as TaskMigrated, similar to ProducerFenced.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@abbccdda abbccdda force-pushed the stream-producer-fix-KAFKA-10813 branch from f1f38c4 to ef4531f Compare December 5, 2020 02:32
@abbccdda abbccdda force-pushed the stream-producer-fix-KAFKA-10813 branch from ef4531f to 4cd4ea7 Compare December 6, 2020 06:29

@ableegoldman ableegoldman left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this. I had a few questions since I'm not that familiar with the EOS code and exception handling I guess. Would be good if @mjsax or @guozhangwang could take a look as well

Comment on lines +203 to +205
} else if (exception instanceof ProducerFencedException ||
exception instanceof InvalidProducerEpochException ||
exception instanceof OutOfOrderSequenceException) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that here, we catch these three exceptions -- ProducerFenced, InvalidProducerEpoch, and OutOfOrderSequence -- and wrap them as TaskMigratedException, while in StreamsProducer#send, we catch ProducerFenced, InvalidProducerEpoch, and UnknownProducerId exceptions and wrap those as TaskMigrated?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, I think my question/confusion is twofold:

  1. Why is it OutOfOrderSequence in one place and UnknownProducerId in another?
  2. What is the difference between these two code paths? Is it really possible for example for the ProducerFencedException to sometimes be thrown directly from Producer#send, and sometimes be passed along through the callback ( as in streamsProducer.send(serializedRecord, (metadata, exception) )

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't over thinking here since this is a blocker-fix, so we don't want to trigger any regression here by changing existing exception catching logic. Would be good to do this as a follow-up I guess.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok SG, agree we should keep things simple since this is a last minute blocker

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this a bit, and I think both UnknownProducerId and OutOfOrderSequence could be possibly thrown from the caller or directly (though in the later case they would be wrapped as KafkaException).

I created two tickets, one for producer and one for streams to improve the general picture moving forward.

https://issues.apache.org/jira/browse/KAFKA-10829
https://issues.apache.org/jira/browse/KAFKA-10830

@ableegoldman ableegoldman left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, ok to look into the question about the different exceptions as followup work, or as part of your work to clean up exception handling in the Producer. Thanks for the fix!

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +203 to +205
} else if (exception instanceof ProducerFencedException ||
exception instanceof InvalidProducerEpochException ||
exception instanceof OutOfOrderSequenceException) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this a bit, and I think both UnknownProducerId and OutOfOrderSequence could be possibly thrown from the caller or directly (though in the later case they would be wrapped as KafkaException).

I created two tickets, one for producer and one for streams to improve the general picture moving forward.

https://issues.apache.org/jira/browse/KAFKA-10829
https://issues.apache.org/jira/browse/KAFKA-10830

@abbccdda abbccdda merged commit 310e240 into apache:trunk Dec 10, 2020
abbccdda pushed a commit to abbccdda/kafka that referenced this pull request Dec 10, 2020
As suggested, ensure InvalidProducerEpoch gets caught properly on stream side.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>
abbccdda pushed a commit that referenced this pull request Dec 10, 2020
As suggested, ensure InvalidProducerEpoch gets caught properly on stream side.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, A. Sophie Blee-Goldman <sophie@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
@abbccdda

Copy link
Copy Markdown
Author

Cherry-picked to 2.7

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants