KAFKA-17848: Fixing share purgatory request and locks handling#17583
Conversation
|
@junrao For my understanding, what can trigger |
| Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData; | ||
| // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. | ||
| if (topicPartitionDataFromTryComplete.isEmpty()) | ||
| if (topicPartitionDataFromTryComplete == null || topicPartitionDataFromTryComplete.isEmpty()) |
There was a problem hiding this comment.
For my understanding, how can this value be null? We initialize it in the DelayedShareFetch constructor and its updation always returns a map.
There was a problem hiding this comment.
I read one thing wrong and instead of null it'a always empty. Re-checking and verifying why this fixed consistently with this PR change, I can easily reproduce the issue without this change and can never with. I will update.
| // However, this check alone cannot guarantee that request is really completed. It is possible that | ||
| // tryComplete is invoked by multiple threads and state has yet not updated. Hence, we need to check | ||
| // the forceComplete response as well. | ||
| if (isCompleted()) { |
There was a problem hiding this comment.
Can we also add unit tests for the conditions to verify this line and line 164
There was a problem hiding this comment.
Yeah, let me do that tomorrow. I have tested the runs manually and verified with 25 parallel share consumers and 25 million messages.
There was a problem hiding this comment.
I have added tests.
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the PR. Left a few comments.
| } | ||
| // If we have an acquisition lock timeout for a share-partition, then we should check if | ||
| // there is a pending share fetch request for the share-partition and complete it. | ||
| DelayedShareFetchKey delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()); |
There was a problem hiding this comment.
Should we call this under if (!stateBatches.isEmpty())?
There was a problem hiding this comment.
My bad, corrected it.
| // However, this check alone cannot guarantee that request is really completed. It is possible that | ||
| // tryComplete is invoked by multiple threads and state has yet not updated. Hence, we need to check | ||
| // the forceComplete response as well. | ||
| if (isCompleted()) { |
There was a problem hiding this comment.
This is unnecessary since the caller DelayedOperationPurgatory.Watchers.tryCompleteWatched already does this.
There was a problem hiding this comment.
Ohh, I added a log inside the method and can see a lot request lands here.
There was a problem hiding this comment.
So I rechecked and added log line to see the tryComplete is being called even when completed is true.
Here is my understanding, I can see in DelayedOperation.scala:
- tryComplete() is always executed safely i.e. from
safeTryCompleteorsafeTryCompleteOrElsewhich takes a lock on DelayedOperation itself hence no 2 threads can executetryComplete()simultaneously. Correct? - You are right that
tryCompleteWatchedhascompletedcheck already. But the issue does exist.
This triggers only when there are multiple share consumers for same group and same topic partition. I have traced the calls and can find following: the calls originates from addToActionQueue defined in onCompleted of DelayedShareFetch. Though the request goes through tryCompletedWatch but then again the tryComplete is called despite completed. The conditional variable in DelayedOperation etc. seems fine to me. Not sure how it triggers.
[2024-10-25 17:34:32,670] INFO Share fetch request for group SG1, member hYDnbPjATHqM7uFXBGYKTw is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:32,703] INFO Share fetch request for group SG1, member hYDnbPjATHqM7uFXBGYKTw is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:32,754] INFO Share fetch request for group SG1, member hYDnbPjATHqM7uFXBGYKTw is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:33,191] INFO Share fetch request for group SG1, member hYDnbPjATHqM7uFXBGYKTw is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:33,391] INFO Share fetch request for group SG1, member OE4al-DNR6C8u3tiD05rGQ is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:33,682] INFO Share fetch request for group SG1, member OE4al-DNR6C8u3tiD05rGQ is already completed (kafka.server.share.DelayedShareFetch)
[2024-10-25 17:34:34,363] INFO Share fetch request for group SG1, member OE4al-DNR6C8u3tiD05rGQ is already completed (kafka.server.share.DelayedShareFetch)
There was a problem hiding this comment.
But once it's in tryComplete with isCompleted as true of DelayedShareFetch then never again it arrives in tryComplete of that DelayedShareFetch.
There was a problem hiding this comment.
Interesting. tryCompleteWatched checks isCompleted without the lock. So, it's possible that multiple callers check isCompleted and get false. They all get queued up on the lock and will call safeTryComplete and tryComplete multiple times. Perhaps we could further add a isCompleted check inside safeTryComplete before making the tryComplete call.
if (curr.isCompleted) {
// another thread has completed this operation, just remove it
iter.remove()
} else if (curr.safeTryComplete()) {
There was a problem hiding this comment.
Yeah make more sense to move to DelayedOperation. Done.
| log.trace("Record lock partition limit exceeded for SharePartition {}, " + | ||
| "cannot acquire more records", sharePartition); | ||
| } | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Where is the exception coming from?
There was a problem hiding this comment.
I have added the exception block so if there is any unrealized exception then atleast lock should be released.
| } | ||
| return result; | ||
| } | ||
| log.trace("Can't acquire records for any partition in the share fetch request for group {}, member {}, " + |
There was a problem hiding this comment.
The frequency of this log line is phenomenal. I wonder whether it's really helpful or just likely to flood the logs to the extent that it's impossible to see anything else.
There was a problem hiding this comment.
yeah, I think I made it INFO at the beginning because I was testing the purgatory stuff, but going forward we will make the logs as trace/debug. I forgot to change it to DEBUG when it got merged.
There was a problem hiding this comment.
I have removed this log line, we can check the purgatory metric to see waiting requests.
There was a problem hiding this comment.
Thanks. That seems much more appropriate.
|
@junrao @adixitconfluent @AndrewJSchofield Can I please get a re-review. |
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the updated PR. Just a minor comment.
| "topic partitions {}", shareFetchData.groupId(), | ||
| shareFetchData.memberId(), shareFetchData.partitionMaxBytes().keySet()); | ||
| if (!topicPartitionDataFromTryComplete.isEmpty()) { | ||
| boolean result = forceComplete(); |
|
@junrao Thanks for the suggestion, I have addressed the feedback. |
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the updated PR. One more comment.
| private[server] def safeTryComplete(): Boolean = inLock(lock)(tryComplete()) | ||
| private[server] def safeTryComplete(): Boolean = inLock(lock) { | ||
| if (isCompleted) | ||
| true |
There was a problem hiding this comment.
If yes execute the completion logic by calling
forceComplete() and return true iff forceComplete returns true; otherwise return false
This is the return value definition. So if the request is completed, we should return false.
There was a problem hiding this comment.
Yeah, you are right. As the request is already completed then it should return false as other thread should have already bumped the completed count. And it should be removed from iteration once watched sees it completed. I made the change.
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the updated PR. LGTM
…e#17583) For delayed fetch, tryComplete can be called again after onComplete. As the requests are processed with parallel threads hence this scenario can occur. We attain locks in tryComplete which keeps pending as onComplete is never called when request is already completed. Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
…e#17583) For delayed fetch, tryComplete can be called again after onComplete. As the requests are processed with parallel threads hence this scenario can occur. We attain locks in tryComplete which keeps pending as onComplete is never called when request is already completed. Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
…e#17583) For delayed fetch, tryComplete can be called again after onComplete. As the requests are processed with parallel threads hence this scenario can occur. We attain locks in tryComplete which keeps pending as onComplete is never called when request is already completed. Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>
For delayed fetch, tryComplete can be called again after onComplete. As the requests are processed with parallel threads hence this scenario can occur. We attain locks in tryComplete which keeps pending as onComplete is never called when request is already completed.
I have added a Uuid in each DelayedShareFetch on local to track the calls. When second tryComplete is called then it did make a call to forceComplete() but as it was already completed hence onComplete is never called.
The PR adds a safe check of
isCompleted()which alone is not sufficient hence again check for the resonse offorceCompleteto release the locks.Also have moved purgatory calls in share partition out of partition lock as it's not required.