Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

[refactor][group] Simplify code and optimize loading partitions management#1504

Merged
Demogorgon314 merged 1 commit into
streamnative:masterfrom
BewareMyPower:bewaremypower/refactor-group-metadata-manager
Sep 22, 2022
Merged

[refactor][group] Simplify code and optimize loading partitions management#1504
Demogorgon314 merged 1 commit into
streamnative:masterfrom
BewareMyPower:bewaremypower/refactor-group-metadata-manager

Conversation

@BewareMyPower

Copy link
Copy Markdown
Collaborator

Fixes #1502

Motivation

Currently the partitioner in GroupMetadataManager can be customized
but it's only for test. The reason is that when
testRequestHandlingWhileLoadingInProgress was migrated from Kafka
source code, there is something wrong:

  1. otherGroupId should be tested while it's actually groupId.
  2. otherGroupId should be "myGroup", not "myGroupId", which cause the
    error of hash code.

Therefore, a customized partitioner is used in GroupCoordinatorTest.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into loadPartitions if it already
exists in ownedPartitions.

Modifications

  • Don't add partition into loadPartitions when it exists in
    ownedPartitions.
  • Use the fixed partitioner to calculate the partition of a group id.
    Though the algorithm is still a little different from Kafka.
  • Add the partition to ownedPartitions even if loading offsets failed.
  • Add a separate method removeGroupsAndOffsets to remove caches and
    close the producer and reader of a partition. In this method, use
    try-finally block for lock instead of inLock to avoid usage of
    AtomicInteger in a lambda.
  • Fix the wrong tests.

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

…ement

Fixes streamnative#1502

### Motivation

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

### Modifications

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.
@BewareMyPower BewareMyPower self-assigned this Sep 19, 2022
@BewareMyPower BewareMyPower requested a review from a team as a code owner September 19, 2022 14:46
@github-actions github-actions Bot added the no-need-doc This pr does not need any document label Sep 19, 2022
@BewareMyPower

Copy link
Copy Markdown
Collaborator Author

@Demogorgon314 PTAL

@Demogorgon314 Demogorgon314 merged commit 615efc3 into streamnative:master Sep 22, 2022
Demogorgon314 pushed a commit that referenced this pull request Sep 22, 2022
…ement (#1504)

Fixes #1502

### Motivation

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

### Modifications

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.

(cherry picked from commit 615efc3)
@BewareMyPower BewareMyPower deleted the bewaremypower/refactor-group-metadata-manager branch September 23, 2022 01:06
Demogorgon314 pushed a commit that referenced this pull request Sep 28, 2022
…ement (#1504)

Fixes #1502

### Motivation

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

### Modifications

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.

(cherry picked from commit 615efc3)
michaeljmarshall pushed a commit to michaeljmarshall/kop that referenced this pull request Dec 13, 2022
…ement (streamnative#1504)

Fixes streamnative#1502

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.

(cherry picked from commit 615efc3)
Demogorgon314 pushed a commit that referenced this pull request Feb 6, 2023
…ement (#1504)

Fixes #1502

### Motivation

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

### Modifications

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.

(cherry picked from commit 615efc3)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE] Refactor the loading and owned partitions cache management

2 participants