KAFKA-17743: Add minBytes implementation to DelayedShareFetch#17539
Conversation
|
Hi @junrao @apoorvmittal10 , please review my PR when you get a chance. Thanks! |
apoorvmittal10
left a comment
There was a problem hiding this comment.
LGTM, left some comments.
junrao
left a comment
There was a problem hiding this comment.
@adixitconfluent : Thanks for the PR. Left a comment.
junrao
left a comment
There was a problem hiding this comment.
@adixitconfluent : Thanks for the updated PR. Left a few more comments.
|
Hi @junrao , I have addressed your comments. Please re-review my PR when you get a chance. Thanks! |
junrao
left a comment
There was a problem hiding this comment.
@adixitconfluent : Thanks for the updated PR. Made a pass of all files. A few more comments.
apoorvmittal10
left a comment
There was a problem hiding this comment.
Thanks for updated PR, some comments.
| this.shareFetchData = shareFetchData; | ||
| this.replicaManager = replicaManager; | ||
| this.topicPartitionDataFromTryComplete = new LinkedHashMap<>(); | ||
| this.partitionsToComplete = new LinkedHashMap<>(); |
There was a problem hiding this comment.
We initialize the variable in constructor then re-assign while creating another LinkedHashMap in acquirablePartitions() method. Are we are initializing partitionsToComplete here to save null check? Can't we re-use already initialized LinkedHashMap()?
There was a problem hiding this comment.
Yes, there have been comments above where I left some variables as null, and it was pointed out that I need to initialize them to avoid null checks at different places, then we just need to do empty checks.. Hence, I've implemented it in this manner.
There was a problem hiding this comment.
If the concern is with additional null check then I would recommend general helper methods. My concern is with creating additional maps when they are always re-referenced.
private <K,V> void addToNullableMap(Map<K, V> map, K key, V value) {
if (map == null) {
map = new LinkedHashMap<>();
}
map.put(key, value);
}
private boolean isMapEmpty(Map<?,?> map) {
return map == null || map.isEmpty();
}
| this.replicaManager = replicaManager; | ||
| this.topicPartitionDataFromTryComplete = new LinkedHashMap<>(); | ||
| this.partitionsToComplete = new LinkedHashMap<>(); | ||
| this.partitionsAlreadyFetched = new LinkedHashMap<>(); |
There was a problem hiding this comment.
Again we reset partitionsAlreadyFetched to response from replicaManagerReadResponse. Why to have such instances created when anyways we have to re-assign?
There was a problem hiding this comment.
Same reason as above, we are avoiding any null checks. We just check for empty scenarios by doing this.
|
|
||
| if (!topicPartitionDataFromTryComplete.isEmpty()) { | ||
| try { | ||
| if (!topicPartitionData.isEmpty()) { |
There was a problem hiding this comment.
nit: I personally find the code hard to read with nested if/else blocks, same is the case here. Though I leave it on you.
if (topicPartitionData.isEmpty()) {
log.trace("Can't acquire records for any partition in the share fetch request for group {}, member {}, " +
"topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
return false;
}
// In case, fetch offset metadata doesn't exist for one or more topic partitions, we do a
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
// those topic partitions.
Map<TopicIdPartition, LogReadResult> replicaManagerReadResponse = updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
if (!anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) && !isMinBytesSatisfied(topicPartitionData)) {
log.debug("minBytes is not satisfied for the share fetch request for group {}, member {}, " +
"topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(),
shareFetchData.partitionMaxBytes().keySet());
releasePartitionLocks(topicPartitionData.keySet());
return false;
}
partitionsToComplete = topicPartitionData;
partitionsAlreadyFetched = replicaManagerReadResponse;
....
....
There was a problem hiding this comment.
I have been asked to add else blocks in the above comments on this PR, hence I don' think I should change it again. #17539 (comment)
junrao
left a comment
There was a problem hiding this comment.
@adixitconfluent : Thanks for the updated PR. A few more comments.
| responseData = readFromLog(topicPartitionData); | ||
| else | ||
| // There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting | ||
| // updated in a different tryComplete thread. |
There was a problem hiding this comment.
Hmm, Apoorv had a good point in his comment (#17539 (comment)). There seems to be a potential problem. It's possible that thread1 calls tryComplete, finds completed to be false, and is about to set partitionsAlreadyFetched. The expiration thread then calls forceComplete and sets completed to true and proceeds to here. Now, thread1 continues and updates partitionsAlreadyFetched. The expiration thread will pick up the wrong partitionsAlreadyFetched.
There was a problem hiding this comment.
Hi @junrao , I've created a ticket https://issues.apache.org/jira/browse/KAFKA-17948 to track this issue and if it fine to you, I would prefer to address the issue in a future PR.
junrao
left a comment
There was a problem hiding this comment.
@adixitconfluent : Thanks for the updated PR. Just a few minor comments.
apoorvmittal10
left a comment
There was a problem hiding this comment.
Though I would have preferred #17539 (comment), but not something which is really important to me. And 1 major thing we need to fix is https://issues.apache.org/jira/browse/KAFKA-17948 in subsequent PRs. LGTM otherwise.
|
Hi @junrao , I've addressed the comments from the latest review. Please re-review my PR when you get a chance. Thanks! |
junrao
left a comment
There was a problem hiding this comment.
@adixitconfluent : Thanks for the updated PR. LGTM
Left a few minor comments. Since the PR has been iterated for some time, let's address them in the followup PR.
| // hence release the acquired locks. | ||
| if (!completedByMe) { | ||
| releasePartitionLocks(shareFetchData.groupId(), topicPartitionDataFromTryComplete.keySet()); | ||
| Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions(); |
There was a problem hiding this comment.
Since the ordering is important, let's use LinkedHashMap.
| if (sharePartition == null) { | ||
| log.error("Encountered null share partition for groupId={}, topicIdPartition={}. Skipping it.", shareFetchData.groupId(), tp); | ||
| return; | ||
| private Map<TopicIdPartition, LogReadResult> maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) { |
There was a problem hiding this comment.
Since the ordering is important, let's use LinkedHashMap.
|
|
||
| } | ||
|
|
||
| private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) { |
There was a problem hiding this comment.
Since the ordering is important, let's use LinkedHashMap.
| lock.readLock().lock(); | ||
| try { | ||
| if (findNextFetchOffset.get()) | ||
| return Optional.empty(); |
There was a problem hiding this comment.
This means tryComplete will never get non-empty fetchOffsetMetadata and its calculation of minBytes will be off. We need to think through how to address this.
…#17539) Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
…#17539) Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
About
minBytesis a constraint that should be used to delay ShareFetch requests. Hence, I have added the support for minBytes inDelayedShareFetchclassTesting
The added code has been tested with the help of unit tests.