This repository was archived by the owner on Jan 24, 2024. It is now read-only.
[refactor][group] Simplify code and optimize loading partitions management#1504
Merged
Demogorgon314 merged 1 commit intoSep 22, 2022
Conversation
…ement Fixes streamnative#1502 ### Motivation Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. ### Modifications - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests.
Collaborator
Author
|
@Demogorgon314 PTAL |
Demogorgon314
approved these changes
Sep 22, 2022
Demogorgon314
pushed a commit
that referenced
this pull request
Sep 22, 2022
…ement (#1504) Fixes #1502 ### Motivation Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. ### Modifications - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests. (cherry picked from commit 615efc3)
Demogorgon314
pushed a commit
that referenced
this pull request
Sep 28, 2022
…ement (#1504) Fixes #1502 ### Motivation Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. ### Modifications - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests. (cherry picked from commit 615efc3)
michaeljmarshall
pushed a commit
to michaeljmarshall/kop
that referenced
this pull request
Dec 13, 2022
…ement (streamnative#1504) Fixes streamnative#1502 Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests. (cherry picked from commit 615efc3)
Demogorgon314
pushed a commit
that referenced
this pull request
Feb 6, 2023
…ement (#1504) Fixes #1502 ### Motivation Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. ### Modifications - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests. (cherry picked from commit 615efc3)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #1502
Motivation
Currently the
partitionerinGroupMetadataManagercan be customizedbut it's only for test. The reason is that when
testRequestHandlingWhileLoadingInProgresswas migrated from Kafkasource code, there is something wrong:
otherGroupIdshould be tested while it's actuallygroupId.otherGroupIdshould be "myGroup", not "myGroupId", which cause theerror of hash code.
Therefore, a customized partitioner is used in
GroupCoordinatorTest.In addition, after apache/kafka#9441, the
partition of a group id won't be put into
loadPartitionsif it alreadyexists in
ownedPartitions.Modifications
loadPartitionswhen it exists inownedPartitions.Though the algorithm is still a little different from Kafka.
ownedPartitionseven if loading offsets failed.removeGroupsAndOffsetsto remove caches andclose the producer and reader of a partition. In this method, use
try-finally block for lock instead of
inLockto avoid usage ofAtomicIntegerin a lambda.Documentation
Check the box below.
Need to update docs?
doc-required(If you need help on updating docs, create a doc issue)
no-need-doc(Please explain why)
doc(If this PR contains doc changes)