KAFKA-13412; Ensure initTransactions() safe for retry after timeout#11452
Conversation
dajac
left a comment
There was a problem hiding this comment.
@hachikuji Thanks the patch. I faced this issues a couple of times without really understanding the root cause. I am glad that you've fixed it! I left a few comments/questions.
| return result; | ||
| return result; | ||
| } else if (currentState == transientState) { | ||
| return result; |
There was a problem hiding this comment.
Why don't we set pendingResult to null when isCompleted() is true here? I suppose that we assume that the currentState which transition to the next state whenever the response is received so we should never have a completed request here. Therefore, the "prior state" check will handle all the cases. Is my reasoning correct?
There was a problem hiding this comment.
The logic was pretty confusing, so I attempted to simplify it in the latest commit. The need to ensure that the result gets communicated to the user definitely makes this code more awkward.
There was a problem hiding this comment.
The way we did this before was a little strange. Logically, it makes more sense for the partition to be added to the transaction before we append the record to the accumulator.
4fa74a9 to
4ba2fda
Compare
dajac
left a comment
There was a problem hiding this comment.
Thanks for the update. Overall, the changes look good to me. I left a few nits and some clarification questions.
…11452) If the user's `initTransactions` call times out, the user is expected to retry. However, the producer will continue retrying the `InitProducerId` request in the background. If it happens to return before the user retry of `initTransactions`, then the producer will raise an exception about an invalid state transition. The patch fixes the issue by tracking the pending state transition until the user has acknowledged the operation's result. In the case of `initTransactions`, even if the `InitProducerId` returns in the background and the state changes, we can still retry the `initTransactions` call to obtain the result. Reviewers: David Jacot <djacot@confluent.io>
…l batch (#11995) Fixes a regression introduced in #11452. Following [KIP-480](https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner), the `Partitioner` will receive a callback when a batch has been completed so that it can choose another partition. Because of this, we have to wait until the batch has been successfully appended to the accumulator before adding the partition in `TransactionManager.maybeAddPartition`. This is still safe because the `Sender` cannot dequeue a batch from the accumulator until it has been added to the transaction successfully. Reviewers: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>, David Jacot <djacot@confluent.io>, Tom Bentley <tbentley@redhat.com>
…l batch (#11995) Fixes a regression introduced in #11452. Following [KIP-480](https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner), the `Partitioner` will receive a callback when a batch has been completed so that it can choose another partition. Because of this, we have to wait until the batch has been successfully appended to the accumulator before adding the partition in `TransactionManager.maybeAddPartition`. This is still safe because the `Sender` cannot dequeue a batch from the accumulator until it has been added to the transaction successfully. Reviewers: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>, David Jacot <djacot@confluent.io>, Tom Bentley <tbentley@redhat.com>
…l batch (#11995) Fixes a regression introduced in #11452. Following [KIP-480](https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner), the `Partitioner` will receive a callback when a batch has been completed so that it can choose another partition. Because of this, we have to wait until the batch has been successfully appended to the accumulator before adding the partition in `TransactionManager.maybeAddPartition`. This is still safe because the `Sender` cannot dequeue a batch from the accumulator until it has been added to the transaction successfully. Reviewers: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>, David Jacot <djacot@confluent.io>, Tom Bentley <tbentley@redhat.com>
…pache#11452) If the user's `initTransactions` call times out, the user is expected to retry. However, the producer will continue retrying the `InitProducerId` request in the background. If it happens to return before the user retry of `initTransactions`, then the producer will raise an exception about an invalid state transition. The patch fixes the issue by tracking the pending state transition until the user has acknowledged the operation's result. In the case of `initTransactions`, even if the `InitProducerId` returns in the background and the state changes, we can still retry the `initTransactions` call to obtain the result. Reviewers: David Jacot <djacot@confluent.io>
…l batch (apache#11995) Fixes a regression introduced in apache#11452. Following [KIP-480](https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner), the `Partitioner` will receive a callback when a batch has been completed so that it can choose another partition. Because of this, we have to wait until the batch has been successfully appended to the accumulator before adding the partition in `TransactionManager.maybeAddPartition`. This is still safe because the `Sender` cannot dequeue a batch from the accumulator until it has been added to the transaction successfully. Reviewers: Artem Livshits <84364232+artemlivshits@users.noreply.github.com>, David Jacot <djacot@confluent.io>, Tom Bentley <tbentley@redhat.com>
If the user's
initTransactionscall times out, the user is expected to retry. However, the producer will continue retrying theInitProducerIdrequest in the background. If it happens to return before the user retry ofinitTransactions, then the producer will raise an exception about an invalid state transition.The patch fixes the issue by checking both the current state as well as the prior state in order to validate whether the retry is expected.
Committer Checklist (excluded from commit message)