KAFKA-19567: Added the check for underlying partition being the leader in delayedShareFetch tryComplete method#20280
Conversation
…hareFetch tryComplete
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. A few initial comments.
| canComplete = true; | ||
| } catch (NotLeaderOrFollowerException e) { // Case c | ||
| log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); | ||
| log.debug("Broker is no longer the leader or follower of topicIdPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); |
There was a problem hiding this comment.
I'd revert this line because all of the other log lines just call it a topicPartition, in spite of technically having the topic ID. That's really fine because it is just an identifier for a topic partition.
| replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); | ||
| Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); | ||
| if (!partition.isLeader()) { | ||
| throw new NotLeaderOrFollowerException("Broker is no longer the leader of topicIdPartition: " + topicIdPartition); |
There was a problem hiding this comment.
Maybe "topicPartition" is more aligned with the rest of this code.
| Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); | ||
| if (!partition.isLeader()) { | ||
| log.error("Broker is no longer the leader of topicIdPartition {}", topicIdPartition); | ||
| throw new NotLeaderOrFollowerException("Broker is no longer the leader of topicIdPartition: " + topicIdPartition); |
DL1231
left a comment
There was a problem hiding this comment.
Thanks for the patch, please fix the failed UT ReplicaManagerTest#testDelayedShareFetchPurgatoryOperationExpiration()
apoorvmittal10
left a comment
There was a problem hiding this comment.
Looks good, some minor comments.
| // In that case, such partitions would not be able to get acquired, and the tryComplete will keep on returning false. | ||
| // Eventually the operation will get timed out and completed, but it might not get removed from the purgatory. | ||
| // This has been eventually left it like this because the purge interval will make sure that the remaining operations | ||
| // in the purgatory do not grow indefinitely and are purged time to time. |
There was a problem hiding this comment.
Is it time to time, or when keys or operations in purgatory are exceeded against a config. Can you please be explicit in the comment.
| replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); | ||
| Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); | ||
| if (!partition.isLeader()) { | ||
| throw new NotLeaderOrFollowerException("Broker is no longer the leader of topicPartition: " + topicIdPartition); |
There was a problem hiding this comment.
Are you certain the partition is also not follower? If not then shall we use NotLeaderException?
| when(p1.isLeader()).thenReturn(true); | ||
|
|
||
| when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); | ||
| when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); |
There was a problem hiding this comment.
Did we add a new test case that validates the added functionality?
There was a problem hiding this comment.
Thanks for the review. I have made the required updates and added a test case for the remote fetch throwing a NotLeaderException as well
apoorvmittal10
left a comment
There was a problem hiding this comment.
Left some comments.
| // All the topic partitions are acquirable. | ||
| when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true); | ||
|
|
||
| assertFalse(delayedShareFetch.isCompleted()); |
There was a problem hiding this comment.
Shouldn't the delayed share fetch complete?
There was a problem hiding this comment.
Thanks for the review. The DelayedShareFetch should complete, but only after tryComplete is called. I have added an assertTrue for this after tryComplete is called to make the test better.
| replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); | ||
| Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); | ||
| if (!partition.isLeader()) { | ||
| throw new NotLeaderException("Broker is no longer the leader of topicPartition: " + topicIdPartition); |
There was a problem hiding this comment.
Don't you need to handle the exception below?
There was a problem hiding this comment.
Thanks for the review. Yep, I have added another catch block here for the new NotLeaderException thrown.
| canComplete = true; | ||
| } catch (NotLeaderOrFollowerException e) { // Case c | ||
| } catch (NotLeaderException e) { // Case c | ||
| log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); |
There was a problem hiding this comment.
The text is incorrect. It should be for NotLeaderOrFollowerException.
apoorvmittal10
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the PR.
In the current implementation, some delayed share fetch operations get
trapped in the delayed share fetch purgatory when the partition
leaderships change during share consumption. This is because there is no
check in code to make sure the current broker is still the partition
leader corresponding to the share partitions. So, when leadership
changes, the share partitions cannot be acquired, because they have
already been fenced, and tryComplete returns false. Although the
operatio does get completed when the timer expires for it, but it is too
late by then, and the operation get stuck in the watchers list waiting
for it to get purged when estimated operations increase to more than
1000. This Pr resolves this by adding the required check so that if
partition leadership changes, then the delayed share fetches waiting on
it gets completed instantaneously.
Reviewers: Apoorv Mittal apoorvmittal10@gmail.com, Andrew Schofield
aschofield@confluent.io