Skip to content

KAFKA-9418: Add new sendOffsetsToTransaction API to KafkaProducer#7952

Merged
hachikuji merged 16 commits into
apache:trunkfrom
abbccdda:producer_sendoffsets
Jan 22, 2020
Merged

KAFKA-9418: Add new sendOffsetsToTransaction API to KafkaProducer#7952
hachikuji merged 16 commits into
apache:trunkfrom
abbccdda:producer_sendoffsets

Conversation

@abbccdda

@abbccdda abbccdda commented Jan 13, 2020

Copy link
Copy Markdown

As title suggests, the change is bringing in the consumer group metadata as part of the transaction API for correct fencing after 447.
This PR mainly changes on the Producer end for compatible paths to old sendOffsetsToTxn(offsets, groupId) vs new sendOffsetsToTxn(offsets, groupMetadata).

Some integration test extensions are also added to test out the validity of the new sendOffsets API.

This PR also contains some fixes towards the closed protocol PR.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@abbccdda abbccdda changed the title (WIP) KAFKA-9418: Add new sendoffsetsToTransaction API to KafkaProducer (WIP) KAFKA-9418: Add new sendOffsetsToTransaction API to KafkaProducer Jan 13, 2020
@abbccdda abbccdda force-pushed the producer_sendoffsets branch 3 times, most recently from 48db78d to e2fad5b Compare January 15, 2020 00:21

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing: #7897 (comment)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing #7897 (comment)

@abbccdda abbccdda Jan 15, 2020

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the following new tests are addressing the comments for separating valid and invalid scenario: #7897 (comment)

@abbccdda abbccdda changed the title (WIP) KAFKA-9418: Add new sendOffsetsToTransaction API to KafkaProducer KAFKA-9418: Add new sendOffsetsToTransaction API to KafkaProducer Jan 15, 2020
@abbccdda abbccdda force-pushed the producer_sendoffsets branch 2 times, most recently from 0938180 to 797817e Compare January 15, 2020 22:48

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We unify the API for groupId and groupMetadata commit here, which includes the groupId within the metadata struct.
A boolean flag indicating whether to turn on global fencing shall be passed down to the txn commit sender to determine whether we should include (member.id, instance.id, generation.id) in the request.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This separation is to avoid if-else loop complexity warning from checkstyle.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When working with the multi-version API, I realized that by making the data initialization internal could save a lot of caller's effort.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test coverage for KafkaProducerTest is weak in general. We just did the bare minimum here to route the request through a full init->begin->commit->end workflow and make sure it is working properly.

Comment thread clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java Outdated

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just leveraging the same security check here, no harm to do for both API calls.

@abbccdda abbccdda Jan 15, 2020

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 3 tests here are primarily evaluating that when we are on groupMetadata mode, we could correctly detect FENCED_INSTANCE_ID, UNKNOWN_MEMBER_ID and ILLEGAL_GENERATION exceptions.

@abbccdda abbccdda force-pushed the producer_sendoffsets branch 2 times, most recently from 05d9964 to b9084df Compare January 15, 2020 23:18

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A full test to set all 3 group fields

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use new API for compatibility test.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here for compatibility.

Comment thread clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should use CommitFailedException. In the consumer, we do not expose illegal generation and unknown member id errors directly to the user.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good suggestion, I could wrap unknown member id and illegal generation by a commit failed exception.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we can do away with enableGroupFencing and derive its value from ConsumerGroupMetadata. In spite of my comment on the previous PR, it may be simpler to just let the defaults be consistent with the expected default values and just have one path below.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we distinguish with the case where only groupId is passed, v.s. the whole groupMetadata is passed but other fields are UNKNOWN_XXX? Do we guarantee that groupMetadata#generationId should never be UNKNOWN_GENERATION_ID (from the broker-side logic we would not check if the generationId < 0)? If yes then we can use that as the boolean flag and get rid of enableGroupFencing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should validate the object when it is received in sendOffsets.

Comment thread clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java Outdated
@mjsax mjsax added the producer label Jan 16, 2020

@mjsax mjsax left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only did a high level pass -- leave if to @hachikuji to merge.

Comment thread clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java Outdated

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Should sendOffsetsToTransaction not be able to handle null gracefully? Seems it would be a regression if we change the behavior and start to fail on null?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is actually not, I will try to specify the object type so we don't need to use 0 length string

Comment thread clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we distinguish with the case where only groupId is passed, v.s. the whole groupMetadata is passed but other fields are UNKNOWN_XXX? Do we guarantee that groupMetadata#generationId should never be UNKNOWN_GENERATION_ID (from the broker-side logic we would not check if the generationId < 0)? If yes then we can use that as the boolean flag and get rid of enableGroupFencing.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test changes in this class are only for new API coverage.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some thoughts, I feel hesitated to classify the FencedInstanceId as a sub type of CommitFailed, for producer exception handling we should abort the current transaction and let consumer rejoin the group as needed. For instanceId fenced, it is more fatal as an indicator of a malicious client that should fail the entire client. Like @hachikuji proposed, it makes sense for us to specify new EOS example code once the changes are merged so that we could make sure the API is user friendly: https://issues.apache.org/jira/browse/KAFKA-9447

@hachikuji

Copy link
Copy Markdown
Contributor

retest this please

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, a few more comments. I think we're almost there.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really valid to commit offsets with a null groupId. Why don't we use requireNonNull?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do that, it's just a legacy logic for allowing null groupId

@hachikuji hachikuji Jan 21, 2020

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It never actually made sense since the producer itself doesn't support it.

Comment thread clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's probably a good case to raise this one directly as an abortable error instead of getting wrapped in CommitFailedException. Although it is not fatal for the producer, the user shouldn't ignore it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User will have to handle this exception properly by either:

  1. aborting the transaction as recommended
  2. fail the entire application to be more strict
  3. other cases as they see fit

So no matter how to handle it, the error shall not be ignored. If we are throwing 3 different types of exceptions, user would as well need to catch 3 different types of exceptions IMHO

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My expectation for illegal generation and unknown member id is that it can be more or less ignored. The user should abort the transaction, but then continue after rejoining the group. The instance fenced error means a new instance of the application has been started somewhere and the application should be stopped.

By the way, this is why I suggested writing some example code which shows what we consider to be proper handling. This will give us a better idea if the handling is reasonable, awkward, or incomplete.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, we also had a jira to track it here: https://issues.apache.org/jira/browse/KAFKA-9447

Comment thread clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test cases seem to be identical code other than the error. Can we factor out a helper?

@abbccdda abbccdda Jan 19, 2020

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there are a couple of differences inside the test, such as error type, consumer metadata creation, and request matcher. Probably we could just leave as it is since new readers would just fix one by reading through the whole block

Comment thread clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java Outdated
@abbccdda abbccdda force-pushed the producer_sendoffsets branch from 70a9323 to a08d25e Compare January 21, 2020 23:24
@hachikuji

Copy link
Copy Markdown
Contributor

retest this please

@hachikuji

Copy link
Copy Markdown
Contributor

ok to test

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully final round of comments.

Comment thread clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java Outdated
private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
if (groupMetadata == null) {
throw new IllegalStateException("Consumer group metadata could not be null");
} else if (groupMetadata.groupId() == null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we may as well move this check into ConsumerGroupMetadata since we have some other null checks there.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check is a little bit over-specific for ConsumerGroupMetadata itself, which is why I put advanced check in here so that people could construct group metadata in error format as they want.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate why this is different from e.g. the memberId?


private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) {
if (groupMetadata == null) {
throw new IllegalStateException("Consumer group metadata could not be null");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these should all be IllegalArgumentException. The producer is not in an illegal state.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, how did we end up back here? I thought we agreed this should not be fatal for the producer? I think it should have a separate branch above, similar to the handling of GROUP_AUTHORIZATION_FAILED.

@abbccdda abbccdda force-pushed the producer_sendoffsets branch from 9f97339 to 9cf4a2a Compare January 22, 2020 17:48
@hachikuji

Copy link
Copy Markdown
Contributor

retest this please

@hachikuji

Copy link
Copy Markdown
Contributor

ok to test

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch!

@hachikuji hachikuji merged commit de90175 into apache:trunk Jan 22, 2020
@abbccdda abbccdda deleted the producer_sendoffsets branch January 22, 2020 21:52
ijuma added a commit to confluentinc/kafka that referenced this pull request Jan 23, 2020
* apache-github/trunk:
  KAFKA-9418; Add new sendOffsetsToTransaction API to KafkaProducer (apache#7952)
  KAFKA-7273 Clarification on mutability of headers passed to Converter#fromConnectData() (apache#7489)
  MINOR: Only update a request's local complete time in API handler if unset (apache#7813)
  KAFKA-9143: Log task reconfiguration error only when it happened (apache#7648)
  MINOR: Change the log level from ERROR to DEBUG when failing to get plugin loader for connector (apache#7964)
  KAFKA-9024: Better error message when field specified does not exist (apache#7819)
  KAFKA-7204: Avoid clearing records for paused partitions on poll of MockConsumer (apache#7505)
  KAFKA-9083: Various fixes/improvements for Connect's Values class (apache#7593)
  MINOR: log error message from Connect sink exception (apache#7555)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants