Skip to content

KAFKA-17743: Add minBytes implementation to DelayedShareFetch#17539

Merged
junrao merged 46 commits into
apache:trunkfrom
adixitconfluent:kafka-17743
Nov 7, 2024
Merged

KAFKA-17743: Add minBytes implementation to DelayedShareFetch#17539
junrao merged 46 commits into
apache:trunkfrom
adixitconfluent:kafka-17743

Conversation

@adixitconfluent

@adixitconfluent adixitconfluent commented Oct 18, 2024

Copy link
Copy Markdown
Contributor

About

minBytes is a constraint that should be used to delay ShareFetch requests. Hence, I have added the support for minBytes in DelayedShareFetch class

Testing

The added code has been tested with the help of unit tests.

@github-actions github-actions Bot added core Kafka Broker KIP-932 Queues for Kafka small Small PRs labels Oct 18, 2024
@github-actions github-actions Bot removed the small Small PRs label Oct 18, 2024
@adixitconfluent adixitconfluent marked this pull request as ready for review October 18, 2024 12:18
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
@adixitconfluent

Copy link
Copy Markdown
Contributor Author

Hi @junrao @apoorvmittal10 , please review my PR when you get a chance. Thanks!

@apoorvmittal10 apoorvmittal10 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left some comments.

Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the PR. Left a comment.

Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the updated PR. Left a few more comments.

Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/main/java/kafka/server/share/SharePartition.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
@adixitconfluent

Copy link
Copy Markdown
Contributor Author

Hi @junrao , I have addressed your comments. Please re-review my PR when you get a chance. Thanks!

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the updated PR. Made a pass of all files. A few more comments.

Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/SharePartition.java
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/test/java/kafka/server/share/DelayedShareFetchTest.java Outdated
Comment thread core/src/test/java/kafka/server/share/DelayedShareFetchTest.java Outdated
Comment thread core/src/test/java/kafka/server/share/DelayedShareFetchTest.java Outdated
Comment thread core/src/test/java/kafka/server/share/DelayedShareFetchTest.java Outdated

@apoorvmittal10 apoorvmittal10 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updated PR, some comments.

this.shareFetchData = shareFetchData;
this.replicaManager = replicaManager;
this.topicPartitionDataFromTryComplete = new LinkedHashMap<>();
this.partitionsToComplete = new LinkedHashMap<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We initialize the variable in constructor then re-assign while creating another LinkedHashMap in acquirablePartitions() method. Are we are initializing partitionsToComplete here to save null check? Can't we re-use already initialized LinkedHashMap()?

@adixitconfluent adixitconfluent Nov 5, 2024

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there have been comments above where I left some variables as null, and it was pointed out that I need to initialize them to avoid null checks at different places, then we just need to do empty checks.. Hence, I've implemented it in this manner.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the concern is with additional null check then I would recommend general helper methods. My concern is with creating additional maps when they are always re-referenced.

    private <K,V> void addToNullableMap(Map<K, V> map, K key, V value) {
        if (map == null) {
            map = new LinkedHashMap<>();
        }
        map.put(key, value);
    }

    private boolean isMapEmpty(Map<?,?> map) {
        return map == null || map.isEmpty();
    }

this.replicaManager = replicaManager;
this.topicPartitionDataFromTryComplete = new LinkedHashMap<>();
this.partitionsToComplete = new LinkedHashMap<>();
this.partitionsAlreadyFetched = new LinkedHashMap<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again we reset partitionsAlreadyFetched to response from replicaManagerReadResponse. Why to have such instances created when anyways we have to re-assign?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as above, we are avoiding any null checks. We just check for empty scenarios by doing this.

Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated

if (!topicPartitionDataFromTryComplete.isEmpty()) {
try {
if (!topicPartitionData.isEmpty()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I personally find the code hard to read with nested if/else blocks, same is the case here. Though I leave it on you.

if (topicPartitionData.isEmpty()) {
     log.trace("Can't acquire records for any partition in the share fetch request for group {}, member {}, " +
                        "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(),
                    shareFetchData.partitionMaxBytes().keySet());
    return false;
}

// In case, fetch offset metadata doesn't exist for one or more topic partitions, we do a
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
// those topic partitions.
Map<TopicIdPartition, LogReadResult> replicaManagerReadResponse = updateFetchOffsetMetadata(maybeReadFromLog(topicPartitionData));
if (!anyTopicIdPartitionHasLogReadError(replicaManagerReadResponse) && !isMinBytesSatisfied(topicPartitionData)) {
       log.debug("minBytes is not satisfied for the share fetch request for group {}, member {}, " +
                            "topic partitions {}", shareFetchData.groupId(), shareFetchData.memberId(),
       shareFetchData.partitionMaxBytes().keySet());
       releasePartitionLocks(topicPartitionData.keySet());
       return false;
}

partitionsToComplete = topicPartitionData;
partitionsAlreadyFetched = replicaManagerReadResponse;
....
....

@adixitconfluent adixitconfluent Nov 5, 2024

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been asked to add else blocks in the above comments on this PR, hence I don' think I should change it again. #17539 (comment)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leave it on @junrao to decide then.

Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the updated PR. A few more comments.

Comment thread core/src/main/java/kafka/server/share/SharePartitionManager.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/ShareFetchUtils.java
Comment thread core/src/test/java/kafka/server/share/DelayedShareFetchTest.java Outdated
Comment thread core/src/test/java/kafka/server/share/DelayedShareFetchTest.java Outdated
responseData = readFromLog(topicPartitionData);
else
// There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting
// updated in a different tryComplete thread.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, Apoorv had a good point in his comment (#17539 (comment)). There seems to be a potential problem. It's possible that thread1 calls tryComplete, finds completed to be false, and is about to set partitionsAlreadyFetched. The expiration thread then calls forceComplete and sets completed to true and proceeds to here. Now, thread1 continues and updates partitionsAlreadyFetched. The expiration thread will pick up the wrong partitionsAlreadyFetched.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junrao , I've created a ticket https://issues.apache.org/jira/browse/KAFKA-17948 to track this issue and if it fine to you, I would prefer to address the issue in a future PR.

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the updated PR. Just a few minor comments.

Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java Outdated
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java
Comment thread core/src/main/java/kafka/server/share/DelayedShareFetch.java

@apoorvmittal10 apoorvmittal10 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I would have preferred #17539 (comment), but not something which is really important to me. And 1 major thing we need to fix is https://issues.apache.org/jira/browse/KAFKA-17948 in subsequent PRs. LGTM otherwise.

@adixitconfluent

Copy link
Copy Markdown
Contributor Author

Hi @junrao , I've addressed the comments from the latest review. Please re-review my PR when you get a chance. Thanks!

@junrao junrao left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the updated PR. LGTM

Left a few minor comments. Since the PR has been iterated for some time, let's address them in the followup PR.

// hence release the acquired locks.
if (!completedByMe) {
releasePartitionLocks(shareFetchData.groupId(), topicPartitionDataFromTryComplete.keySet());
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the ordering is important, let's use LinkedHashMap.

if (sharePartition == null) {
log.error("Encountered null share partition for groupId={}, topicIdPartition={}. Skipping it.", shareFetchData.groupId(), tp);
return;
private Map<TopicIdPartition, LogReadResult> maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the ordering is important, let's use LinkedHashMap.


}

private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the ordering is important, let's use LinkedHashMap.

lock.readLock().lock();
try {
if (findNextFetchOffset.get())
return Optional.empty();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means tryComplete will never get non-empty fetchOffsetMetadata and its calculation of minBytes will be off. We need to think through how to address this.

@junrao junrao merged commit a0d4cbe into apache:trunk Nov 7, 2024
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Nov 30, 2024
…#17539)

Reviewers:  Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
tedyu pushed a commit to tedyu/kafka that referenced this pull request Jan 6, 2025
…#17539)

Reviewers:  Apoorv Mittal <apoorvmittal10@gmail.com>, Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Gradle build or GitHub Actions ci-approved core Kafka Broker KIP-932 Queues for Kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants