Skip to content

KAFKA-19567: Added the check for underlying partition being the leader in delayedShareFetch tryComplete method#20280

Merged
apoorvmittal10 merged 7 commits into
apache:trunkfrom
chirag-wadhwa5:KAFKA-19567
Aug 10, 2025
Merged

KAFKA-19567: Added the check for underlying partition being the leader in delayedShareFetch tryComplete method#20280
apoorvmittal10 merged 7 commits into
apache:trunkfrom
chirag-wadhwa5:KAFKA-19567

Conversation

@chirag-wadhwa5

@chirag-wadhwa5 chirag-wadhwa5 commented Jul 31, 2025

Copy link
Copy Markdown
Collaborator

In the current implementation, some delayed share fetch operations get
trapped in the delayed share fetch purgatory when the partition
leaderships change during share consumption. This is because there is no
check in code to make sure the current broker is still the partition
leader corresponding to the share partitions. So, when leadership
changes, the share partitions cannot be acquired, because they have
already been fenced, and tryComplete returns false. Although the
operatio does get completed when the timer expires for it, but it is too
late by then, and the operation get stuck in the watchers list waiting
for it to get purged when estimated operations increase to more than
1000. This Pr resolves this by adding the required check so that if
partition leadership changes, then the delayed share fetches waiting on
it gets completed instantaneously.

Reviewers: Apoorv Mittal apoorvmittal10@gmail.com, Andrew Schofield
aschofield@confluent.io

@github-actions github-actions Bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka labels Jul 31, 2025

@AndrewJSchofield AndrewJSchofield left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A few initial comments.

canComplete = true;
} catch (NotLeaderOrFollowerException e) { // Case c
log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
log.debug("Broker is no longer the leader or follower of topicIdPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd revert this line because all of the other log lines just call it a topicPartition, in spite of technically having the topic ID. That's really fine because it is just an identifier for a topic partition.

replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
if (!partition.isLeader()) {
throw new NotLeaderOrFollowerException("Broker is no longer the leader of topicIdPartition: " + topicIdPartition);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "topicPartition" is more aligned with the rest of this code.

Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
if (!partition.isLeader()) {
log.error("Broker is no longer the leader of topicIdPartition {}", topicIdPartition);
throw new NotLeaderOrFollowerException("Broker is no longer the leader of topicIdPartition: " + topicIdPartition);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"topicPartition" :)

@AndrewJSchofield AndrewJSchofield added ci-approved and removed triage PRs from the community labels Jul 31, 2025

@DL1231 DL1231 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, please fix the failed UT ReplicaManagerTest#testDelayedShareFetchPurgatoryOperationExpiration()

@chirag-wadhwa5 chirag-wadhwa5 changed the title Added the check for underlying partition being the leader in delayedShareFetch tryComplete methhod KAFKA-19567: Added the check for underlying partition being the leader in delayedShareFetch tryComplete methhod Aug 4, 2025

@apoorvmittal10 apoorvmittal10 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, some minor comments.

// In that case, such partitions would not be able to get acquired, and the tryComplete will keep on returning false.
// Eventually the operation will get timed out and completed, but it might not get removed from the purgatory.
// This has been eventually left it like this because the purge interval will make sure that the remaining operations
// in the purgatory do not grow indefinitely and are purged time to time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it time to time, or when keys or operations in purgatory are exceeded against a config. Can you please be explicit in the comment.

replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
if (!partition.isLeader()) {
throw new NotLeaderOrFollowerException("Broker is no longer the leader of topicPartition: " + topicIdPartition);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you certain the partition is also not follower? If not then shall we use NotLeaderException?

when(p1.isLeader()).thenReturn(true);

when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we add a new test case that validates the added functionality?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I have made the required updates and added a test case for the remote fetch throwing a NotLeaderException as well

@apoorvmittal10 apoorvmittal10 changed the title KAFKA-19567: Added the check for underlying partition being the leader in delayedShareFetch tryComplete methhod KAFKA-19567: Added the check for underlying partition being the leader in delayedShareFetch tryComplete method Aug 7, 2025

@apoorvmittal10 apoorvmittal10 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.

// All the topic partitions are acquirable.
when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);

assertFalse(delayedShareFetch.isCompleted());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the delayed share fetch complete?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. The DelayedShareFetch should complete, but only after tryComplete is called. I have added an assertTrue for this after tryComplete is called to make the test better.

replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
if (!partition.isLeader()) {
throw new NotLeaderException("Broker is no longer the leader of topicPartition: " + topicIdPartition);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to handle the exception below?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Yep, I have added another catch block here for the new NotLeaderException thrown.

canComplete = true;
} catch (NotLeaderOrFollowerException e) { // Case c
} catch (NotLeaderException e) { // Case c
log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The text is incorrect. It should be for NotLeaderOrFollowerException.

@apoorvmittal10 apoorvmittal10 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the PR.

@apoorvmittal10 apoorvmittal10 merged commit 43a2504 into apache:trunk Aug 10, 2025
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants