KAFKA-9418: Add new sendOffsetsToTransaction API to KafkaProducer#7952
Conversation
48db78d to
e2fad5b
Compare
There was a problem hiding this comment.
This and the following new tests are addressing the comments for separating valid and invalid scenario: #7897 (comment)
0938180 to
797817e
Compare
There was a problem hiding this comment.
We unify the API for groupId and groupMetadata commit here, which includes the groupId within the metadata struct.
A boolean flag indicating whether to turn on global fencing shall be passed down to the txn commit sender to determine whether we should include (member.id, instance.id, generation.id) in the request.
There was a problem hiding this comment.
This separation is to avoid if-else loop complexity warning from checkstyle.
There was a problem hiding this comment.
When working with the multi-version API, I realized that by making the data initialization internal could save a lot of caller's effort.
There was a problem hiding this comment.
The test coverage for KafkaProducerTest is weak in general. We just did the bare minimum here to route the request through a full init->begin->commit->end workflow and make sure it is working properly.
There was a problem hiding this comment.
This is just leveraging the same security check here, no harm to do for both API calls.
There was a problem hiding this comment.
The 3 tests here are primarily evaluating that when we are on groupMetadata mode, we could correctly detect FENCED_INSTANCE_ID, UNKNOWN_MEMBER_ID and ILLEGAL_GENERATION exceptions.
05d9964 to
b9084df
Compare
There was a problem hiding this comment.
A full test to set all 3 group fields
There was a problem hiding this comment.
Use new API for compatibility test.
There was a problem hiding this comment.
I'm wondering if we should use CommitFailedException. In the consumer, we do not expose illegal generation and unknown member id errors directly to the user.
There was a problem hiding this comment.
That's a good suggestion, I could wrap unknown member id and illegal generation by a commit failed exception.
There was a problem hiding this comment.
nit: I think we can do away with enableGroupFencing and derive its value from ConsumerGroupMetadata. In spite of my comment on the previous PR, it may be simpler to just let the defaults be consistent with the expected default values and just have one path below.
There was a problem hiding this comment.
How do we distinguish with the case where only groupId is passed, v.s. the whole groupMetadata is passed but other fields are UNKNOWN_XXX? Do we guarantee that groupMetadata#generationId should never be UNKNOWN_GENERATION_ID (from the broker-side logic we would not check if the generationId < 0)? If yes then we can use that as the boolean flag and get rid of enableGroupFencing.
There was a problem hiding this comment.
I think we should validate the object when it is received in sendOffsets.
mjsax
left a comment
There was a problem hiding this comment.
Only did a high level pass -- leave if to @hachikuji to merge.
There was a problem hiding this comment.
Why this change? Should sendOffsetsToTransaction not be able to handle null gracefully? Seems it would be a regression if we change the behavior and start to fail on null?
There was a problem hiding this comment.
It is actually not, I will try to specify the object type so we don't need to use 0 length string
There was a problem hiding this comment.
How do we distinguish with the case where only groupId is passed, v.s. the whole groupMetadata is passed but other fields are UNKNOWN_XXX? Do we guarantee that groupMetadata#generationId should never be UNKNOWN_GENERATION_ID (from the broker-side logic we would not check if the generationId < 0)? If yes then we can use that as the boolean flag and get rid of enableGroupFencing.
There was a problem hiding this comment.
Test changes in this class are only for new API coverage.
There was a problem hiding this comment.
After some thoughts, I feel hesitated to classify the FencedInstanceId as a sub type of CommitFailed, for producer exception handling we should abort the current transaction and let consumer rejoin the group as needed. For instanceId fenced, it is more fatal as an indicator of a malicious client that should fail the entire client. Like @hachikuji proposed, it makes sense for us to specify new EOS example code once the changes are merged so that we could make sure the API is user friendly: https://issues.apache.org/jira/browse/KAFKA-9447
|
retest this please |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, a few more comments. I think we're almost there.
There was a problem hiding this comment.
It's not really valid to commit offsets with a null groupId. Why don't we use requireNonNull?
There was a problem hiding this comment.
We could do that, it's just a legacy logic for allowing null groupId
There was a problem hiding this comment.
It never actually made sense since the producer itself doesn't support it.
There was a problem hiding this comment.
I think there's probably a good case to raise this one directly as an abortable error instead of getting wrapped in CommitFailedException. Although it is not fatal for the producer, the user shouldn't ignore it.
There was a problem hiding this comment.
User will have to handle this exception properly by either:
- aborting the transaction as recommended
- fail the entire application to be more strict
- other cases as they see fit
So no matter how to handle it, the error shall not be ignored. If we are throwing 3 different types of exceptions, user would as well need to catch 3 different types of exceptions IMHO
There was a problem hiding this comment.
My expectation for illegal generation and unknown member id is that it can be more or less ignored. The user should abort the transaction, but then continue after rejoining the group. The instance fenced error means a new instance of the application has been started somewhere and the application should be stopped.
By the way, this is why I suggested writing some example code which shows what we consider to be proper handling. This will give us a better idea if the handling is reasonable, awkward, or incomplete.
There was a problem hiding this comment.
That's true, we also had a jira to track it here: https://issues.apache.org/jira/browse/KAFKA-9447
There was a problem hiding this comment.
These test cases seem to be identical code other than the error. Can we factor out a helper?
There was a problem hiding this comment.
Actually there are a couple of differences inside the test, such as error type, consumer metadata creation, and request matcher. Probably we could just leave as it is since new readers would just fix one by reading through the whole block
This reverts commit 437f845.
70a9323 to
a08d25e
Compare
|
retest this please |
|
ok to test |
hachikuji
left a comment
There was a problem hiding this comment.
Hopefully final round of comments.
| private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) { | ||
| if (groupMetadata == null) { | ||
| throw new IllegalStateException("Consumer group metadata could not be null"); | ||
| } else if (groupMetadata.groupId() == null) { |
There was a problem hiding this comment.
nit: we may as well move this check into ConsumerGroupMetadata since we have some other null checks there.
There was a problem hiding this comment.
The check is a little bit over-specific for ConsumerGroupMetadata itself, which is why I put advanced check in here so that people could construct group metadata in error format as they want.
There was a problem hiding this comment.
Can you elaborate why this is different from e.g. the memberId?
|
|
||
| private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata groupMetadata) { | ||
| if (groupMetadata == null) { | ||
| throw new IllegalStateException("Consumer group metadata could not be null"); |
There was a problem hiding this comment.
I think these should all be IllegalArgumentException. The producer is not in an illegal state.
There was a problem hiding this comment.
Wait, how did we end up back here? I thought we agreed this should not be fatal for the producer? I think it should have a separate branch above, similar to the handling of GROUP_AUTHORIZATION_FAILED.
9f97339 to
9cf4a2a
Compare
|
retest this please |
|
ok to test |
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the patch!
* apache-github/trunk: KAFKA-9418; Add new sendOffsetsToTransaction API to KafkaProducer (apache#7952) KAFKA-7273 Clarification on mutability of headers passed to Converter#fromConnectData() (apache#7489) MINOR: Only update a request's local complete time in API handler if unset (apache#7813) KAFKA-9143: Log task reconfiguration error only when it happened (apache#7648) MINOR: Change the log level from ERROR to DEBUG when failing to get plugin loader for connector (apache#7964) KAFKA-9024: Better error message when field specified does not exist (apache#7819) KAFKA-7204: Avoid clearing records for paused partitions on poll of MockConsumer (apache#7505) KAFKA-9083: Various fixes/improvements for Connect's Values class (apache#7593) MINOR: log error message from Connect sink exception (apache#7555)
As title suggests, the change is bringing in the consumer group metadata as part of the transaction API for correct fencing after 447.
This PR mainly changes on the Producer end for compatible paths to old
sendOffsetsToTxn(offsets, groupId)vs newsendOffsetsToTxn(offsets, groupMetadata).Some integration test extensions are also added to test out the validity of the new sendOffsets API.
This PR also contains some fixes towards the closed protocol PR.
Committer Checklist (excluded from commit message)