Skip to content

KAFKA-17848: Fixing share purgatory request and locks handling#17583

Merged
junrao merged 8 commits into
apache:trunkfrom
apoorvmittal10:KAFKA-17848
Oct 26, 2024
Merged

KAFKA-17848: Fixing share purgatory request and locks handling#17583
junrao merged 8 commits into
apache:trunkfrom
apoorvmittal10:KAFKA-17848

Conversation

@apoorvmittal10

@apoorvmittal10 apoorvmittal10 commented Oct 22, 2024

Copy link
Copy Markdown
Contributor

For delayed fetch, tryComplete can be called again after onComplete. As the requests are processed with parallel threads hence this scenario can occur. We attain locks in tryComplete which keeps pending as onComplete is never called when request is already completed.

I have added a Uuid in each DelayedShareFetch on local to track the calls. When second tryComplete is called then it did make a call to forceComplete() but as it was already completed hence onComplete is never called.

The PR adds a safe check of isCompleted() which alone is not sufficient hence again check for the resonse of forceComplete to release the locks.

Also have moved purgatory calls in share partition out of partition lock as it's not required.

[2024-10-23 16:27:03,875] INFO Try to complete. Member NDHrnOecQ8aTXA6PXfdcTw, Uuid: mQMiOtV9RYSQciVQ9dT3AA (kafka.server.share.DelayedShareFetch)

[2024-10-23 16:27:03,875] INFO Completing the delayed share fetch request for group perf-share-consumer, member NDHrnOecQ8aTXA6PXfdcTw, Uuid: mQMiOtV9RYSQciVQ9dT3AA (kafka.server.share.DelayedShareFetch)

[2024-10-23 16:27:03,907] INFO Try to complete. Member NDHrnOecQ8aTXA6PXfdcTw, mQMiOtV9RYSQciVQ9dT3AA (kafka.server.share.DelayedShareFetch)

@github-actions github-actions Bot added core Kafka Broker KIP-932 Queues for Kafka small Small PRs labels Oct 22, 2024
@apoorvmittal10

Copy link
Copy Markdown
Contributor Author

@junrao For my understanding, what can trigger onComplete without invoking tryComplete? Is it the number of requests that can be in purgatory? I was doing 10 share consumers paralled read with 5Million records already produced over 16 partitions.

Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (topicPartitionDataFromTryComplete.isEmpty())
if (topicPartitionDataFromTryComplete == null || topicPartitionDataFromTryComplete.isEmpty())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, how can this value be null? We initialize it in the DelayedShareFetch constructor and its updation always returns a map.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read one thing wrong and instead of null it'a always empty. Re-checking and verifying why this fixed consistently with this PR change, I can easily reproduce the issue without this change and can never with. I will update.

@apoorvmittal10 apoorvmittal10 marked this pull request as draft October 23, 2024 07:43
@apoorvmittal10 apoorvmittal10 changed the title KAFKA-17848: Fixing NPE in delayed share fetch KAFKA-17848: Fixing share purgatory request and locks handling Oct 23, 2024
@apoorvmittal10 apoorvmittal10 marked this pull request as ready for review October 23, 2024 17:57
@apoorvmittal10 apoorvmittal10 requested review from junrao and removed request for adixitconfluent October 23, 2024 17:57
// However, this check alone cannot guarantee that request is really completed. It is possible that
// tryComplete is invoked by multiple threads and state has yet not updated. Hence, we need to check
// the forceComplete response as well.
if (isCompleted()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add unit tests for the conditions to verify this line and line 164

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let me do that tomorrow. I have tested the runs manually and verified with 25 parallel share consumers and 25 million messages.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added tests.

@AndrewJSchofield AndrewJSchofield left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch.

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the PR. Left a few comments.

}
// If we have an acquisition lock timeout for a share-partition, then we should check if
// there is a pending share fetch request for the share-partition and complete it.
DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call this under if (!stateBatches.isEmpty())?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, corrected it.

// However, this check alone cannot guarantee that request is really completed. It is possible that
// tryComplete is invoked by multiple threads and state has yet not updated. Hence, we need to check
// the forceComplete response as well.
if (isCompleted()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary since the caller DelayedOperationPurgatory.Watchers.tryCompleteWatched already does this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, I added a log inside the method and can see a lot request lands here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I rechecked and added log line to see the tryComplete is being called even when completed is true.

Here is my understanding, I can see in DelayedOperation.scala:

  1. tryComplete() is always executed safely i.e. from safeTryComplete or safeTryCompleteOrElse which takes a lock on DelayedOperation itself hence no 2 threads can execute tryComplete() simultaneously. Correct?
  2. You are right that tryCompleteWatched has completed check already. But the issue does exist.

This triggers only when there are multiple share consumers for same group and same topic partition. I have traced the calls and can find following: the calls originates from addToActionQueue defined in onCompleted of DelayedShareFetch. Though the request goes through tryCompletedWatch but then again the tryComplete is called despite completed. The conditional variable in DelayedOperation etc. seems fine to me. Not sure how it triggers.

[2024-10-25 17:34:32,670] INFO Share fetch request for group SG1, member hYDnbPjATHqM7uFXBGYKTw is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:32,703] INFO Share fetch request for group SG1, member hYDnbPjATHqM7uFXBGYKTw is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:32,754] INFO Share fetch request for group SG1, member hYDnbPjATHqM7uFXBGYKTw is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:33,191] INFO Share fetch request for group SG1, member hYDnbPjATHqM7uFXBGYKTw is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:33,391] INFO Share fetch request for group SG1, member OE4al-DNR6C8u3tiD05rGQ is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:33,682] INFO Share fetch request for group SG1, member OE4al-DNR6C8u3tiD05rGQ is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:34,363] INFO Share fetch request for group SG1, member OE4al-DNR6C8u3tiD05rGQ is already completed (kafka.server.share.DelayedShareFetch)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But once it's in tryComplete with isCompleted as true of DelayedShareFetch then never again it arrives in tryComplete of that DelayedShareFetch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. tryCompleteWatched checks isCompleted without the lock. So, it's possible that multiple callers check isCompleted and get false. They all get queued up on the lock and will call safeTryComplete and tryComplete multiple times. Perhaps we could further add a isCompleted check inside safeTryComplete before making the tryComplete call.

        if (curr.isCompleted) {
          // another thread has completed this operation, just remove it
          iter.remove()
        } else if (curr.safeTryComplete()) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah make more sense to move to DelayedOperation. Done.

log.trace("Record lock partition limit exceeded for SharePartition {}, " +
"cannot acquire more records", sharePartition);
}
} catch (Exception e) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the exception coming from?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the exception block so if there is any unrealized exception then atleast lock should be released.

}
return result;
}
log.trace("Can't acquire records for any partition in the share fetch request for group {}, member {}, " +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The frequency of this log line is phenomenal. I wonder whether it's really helpful or just likely to flood the logs to the extent that it's impossible to see anything else.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think I made it INFO at the beginning because I was testing the purgatory stuff, but going forward we will make the logs as trace/debug. I forgot to change it to DEBUG when it got merged.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed this log line, we can check the purgatory metric to see waiting requests.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. That seems much more appropriate.

@apoorvmittal10

Copy link
Copy Markdown
Contributor Author

@junrao @adixitconfluent @AndrewJSchofield Can I please get a re-review.

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. Just a minor comment.

"topic partitions {}", shareFetchData.groupId(),
shareFetchData.memberId(), shareFetchData.partitionMaxBytes().keySet());
if (!topicPartitionDataFromTryComplete.isEmpty()) {
boolean result = forceComplete();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result => completedByMe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@apoorvmittal10 apoorvmittal10 requested a review from junrao October 25, 2024 22:02
@apoorvmittal10

Copy link
Copy Markdown
Contributor Author

@junrao Thanks for the suggestion, I have addressed the feedback.

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. One more comment.

private[server] def safeTryComplete(): Boolean = inLock(lock)(tryComplete())
private[server] def safeTryComplete(): Boolean = inLock(lock) {
if (isCompleted)
true

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If yes execute the completion logic by calling
forceComplete() and return true iff forceComplete returns true; otherwise return false

This is the return value definition. So if the request is completed, we should return false.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right. As the request is already completed then it should return false as other thread should have already bumped the completed count. And it should be removed from iteration once watched sees it completed. I made the change.

@apoorvmittal10 apoorvmittal10 requested a review from junrao October 26, 2024 10:16

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. LGTM

@junrao junrao merged commit 397ae59 into apache:trunk Oct 26, 2024
abhishekgiri23 pushed a commit to abhishekgiri23/kafka that referenced this pull request Nov 2, 2024
…e#17583)

For delayed fetch, tryComplete can be called again after onComplete. As the requests are processed with parallel threads hence this scenario can occur. We attain locks in tryComplete which keeps pending as onComplete is never called when request is already completed.

Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Nov 30, 2024
…e#17583)

For delayed fetch, tryComplete can be called again after onComplete. As the requests are processed with parallel threads hence this scenario can occur. We attain locks in tryComplete which keeps pending as onComplete is never called when request is already completed.

Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
…e#17583)

For delayed fetch, tryComplete can be called again after onComplete. As the requests are processed with parallel threads hence this scenario can occur. We attain locks in tryComplete which keeps pending as onComplete is never called when request is already completed.

Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants