Skip to content

KAFKA-10614: Ensure group state (un)load is executed in the submitted order#9441

Merged
tombentley merged 1 commit into
apache:trunkfrom
tombentley:KAFKA-10614-group-coordinator-election-resignation
Jun 7, 2021
Merged

KAFKA-10614: Ensure group state (un)load is executed in the submitted order#9441
tombentley merged 1 commit into
apache:trunkfrom
tombentley:KAFKA-10614-group-coordinator-election-resignation

Conversation

@tombentley

Copy link
Copy Markdown
Member

Implements the single thread with FIFO approach suggested in https://issues.apache.org/jira/browse/KAFKA-10614

@tombentley

Copy link
Copy Markdown
Member Author

@guozhangwang this is a draft implementation of one of the solutions you suggested. I will try to add a test, but in the meantime any comments you have are welcome.

@hachikuji

Copy link
Copy Markdown
Contributor

@tombentley Thanks for the patch. I think the problem is in the callback inside KafkaApis.handleLeaderAndIsrRequest. With two concurrent LeaderAndIsr requests, there is no guarantee about the ordering of the callbacks even though ReplicaManager checks epoch. They can get reordered like the following:

  1. Call ReplicaManager.becomeLeaderOrFollower with epoch=1 in thread 1
  2. Call ReplicaManager.becomeLeaderOrFollower with epoch=2 in thread 2
  3. Call onLeadershipChange callback in thread 2
  4. Call onLeadershipChange callback in thread 1

At least that's my understanding of the issue. I think we need to find a way to push the epoch validation into GroupCoordinator.

@tombentley

Copy link
Copy Markdown
Member Author

@hachikuji, yes I see that although replica manager might be synchronized it doesn't prevent the race. I was already working on introducing the epoch into the group coordinator (as the transaction coordinator already does). I'll rework the patch, thanks for the feedback.

@hachikuji

Copy link
Copy Markdown
Contributor

@tombentley Actually I missed the holding of the replicaStateChangeLock inside ReplicaManager.becomeLeaderOrFollower. So the reordering I suggested above seems not possible. Perhaps the problem really is submission to the scheduler. I think we had given the scheduler only a single thread assuming that was good enough to protect the submission order, but maybe it is not.

@tombentley

Copy link
Copy Markdown
Member Author

I assumed the scheduler would be using a heap to prioritize scheduled tasks, and that wouldn't preserve the order of submitted tasks with the same scheduled time. Since System.currentTimeMillis() doesn't tick regularly enough I can see you could have the two threads end up with the same scheduled time of execution (now), but execute in the wrong order. Obviously the FIFO queue approach would solve that, at the cost of an extra thread.

@tombentley tombentley force-pushed the KAFKA-10614-group-coordinator-election-resignation branch from 3d55321 to 09ccad0 Compare October 16, 2020 12:58
@tombentley

Copy link
Copy Markdown
Member Author

@hachikuji I'm now passing in the coordinator epoch and ignoring attempts to unload state if the epoch is old than what's loaded. I've also added a test. One thing I didn't understand was why I needed to change AbstractFetcherManagerTest. Please could you take another look?

@tombentley

Copy link
Copy Markdown
Member Author

One thing I didn't understand was why I needed to change AbstractFetcherManagerTest.

Stupid me, that was because I turned up the logging. I'll revert that.

@tombentley

Copy link
Copy Markdown
Member Author

@guozhangwang, @hachikuji I was thinking... solving it as I have now (keeping track of the epoch) doesn't address another potential problem case where the tenure as leader is very short and the background task to load the state runs after the background task to unload the state in the same epoch. Obviously in this case the load should not be done (I guess it could result in a broker not returning NOT_COORDINATOR when it should, based on incorrect state of ownedPartitions). We could track a high watermark coordinator epoch and guard the load with a check (as well as the unload).

@guozhangwang

Copy link
Copy Markdown
Contributor

Hey @tombentley Sorry for the late reply! I also checked the source code of ScheduledThreadPoolExecutor and I think what you've inferred is correct: although it did not keep a single thread alive all the time but dynamically creates and stops the thread in case there's no tasks scheduled for a long time (note we set num.thread == 1), the blocking queue should still guarantee a FIFO ordering.

So what's possibly happening is that, the thread handling an leaderISR request with the old controller epoch grabs the lock and proceeds first. In that case, maybe we can simplify your proposed solution as the following:

  • note that, like @hachikuji mentioned, onLeadershipChange is actually protected by the replicaStateChangeLock inside the becomeLeaderOrFollower, and hence we only need to pass in the controller epoch in groupCoordinator.onElection/onResignation just like what we did in txnCoordinator.

  • Inside the groupCoordinator.onElection/onResignation, which is lock-protected, we just remember the latest controller epoch at the GroupCoordinator, and then we just to one check right before scheduleLoadGroupAndOffsets/removeGroupsForPartition against that controller epoch, if not passed, we can log and skip the loading / unloading function call.

WDYT?

@tombentley

Copy link
Copy Markdown
Member Author

You're right @guozhangwang, ScheduledFutureTask contains a sequence number to break ties, so the executor is FIFO. So you're saying that the wrong network thread handling one of these two LeaderAndISR acquires replicaStateChangeLock first, right? If that's the case then might a similar problem affect the txnCoordinator too?

I updated the PR if you want to take another look, but I still need to address synchronization, since although onLeadershipChange is executed with ReplicaManager.replicaStateChangeLock, onResignation is also called via handleStopReplicaRequest. Perhaps it's enough to make onResignation synchronized.

@guozhangwang

Copy link
Copy Markdown
Contributor

So you're saying that the wrong network thread handling one of these two LeaderAndISR acquires replicaStateChangeLock first, right?

Sort of :) I think it is possible that, two ISR request reaches the same broker from old and new controller, and they are handled by two different handler threads of the broker. The handler thread handling the old controller, may grab the lock first.

since although onLeadershipChange is executed with ReplicaManager.replicaStateChangeLock, onResignation is also called via handleStopReplicaRequest

How about this: since handleStopReplicaRequest would call replicaManager.stopReplicas which is also protected by the lock, we can just make the following block

if (error == Errors.NONE) {
          if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
              && partitionStates(topicPartition).deletePartition) {
            groupCoordinator.onResignation(topicPartition.partition)
          } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
                     && partitionStates(topicPartition).deletePartition) {
            val partitionState = partitionStates(topicPartition)
            val leaderEpoch = if (partitionState.leaderEpoch >= 0)
                Some(partitionState.leaderEpoch)
            else
              None
            txnCoordinator.onResignation(topicPartition.partition, coordinatorEpoch = leaderEpoch)
          }

which is currently triggered after the call, as passed in as a delta function parameter into that call, just like what becomeLeaderOrFollower did. Then as long as we can pass in the coordinatorEpoch = leaderEpoch to the groupCoordinator#onResignation that should be sufficient.


I looked at your current PR, and I think that looks promising. If you agree my above suggestion and made the changes accordingly, I think we can turn this draft as an official PR and merge.

cc @cadonna @hachikuji

@tombentley tombentley force-pushed the KAFKA-10614-group-coordinator-election-resignation branch from 07a9eb3 to a7cde76 Compare December 15, 2020 11:49
@tombentley

Copy link
Copy Markdown
Member Author

@guozhangwang that's a much better solution, thanks! I've implemented that and rebased for a conflict.

@tombentley tombentley marked this pull request as ready for review December 15, 2020 11:50
@tombentley tombentley force-pushed the KAFKA-10614-group-coordinator-election-resignation branch from a7cde76 to f13bd30 Compare January 25, 2021 15:05
@tombentley

Copy link
Copy Markdown
Member Author

Rebased for conflict.

@guozhangwang @cadonna @hachikuji please could one of you take a look?

@guozhangwang

Copy link
Copy Markdown
Contributor

Yes! I will review it again. Thanks for hanging on there and my apologies... Review has always been a bit overwhelming for me :)

@tombentley

Copy link
Copy Markdown
Member Author

Thanks, and no worries about the wait.

@guozhangwang

Copy link
Copy Markdown
Contributor

This change looks good to me. @hachikuji could you take another look before we merge?

@tombentley tombentley force-pushed the KAFKA-10614-group-coordinator-election-resignation branch from f13bd30 to afab8f1 Compare February 24, 2021 09:57
@tombentley

Copy link
Copy Markdown
Member Author

Rebased for conflict. Grateful if you could take a look @hachikuji.

@vvcephei

Copy link
Copy Markdown
Contributor

Hey @guozhangwang and @hachikuji , I just ran across this PR as part of the 2.8 release. It's not going to make 2.8, but we should go ahead and close the loop to get it fixed for 3.0.

Thanks!

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long delay. The approach looks good. Left a couple comments.

if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) {
info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch")
groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
epochForPartitionId.remove(offsetTopicPartitionId)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Why remove the epoch after resignation? It seems like it would be useful to keep tracking it. Maybe it's useful to distinguish the case where the replica is to be deleted?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess mostly I did it for symmetry with onElection, however you're right that is doesn't affect correctness. I don't entirely follow your point about how we could use this when the replica is being deleted though. Could you explain?

@hachikuji hachikuji Apr 22, 2021

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onResignation hook just means we lost leadership. By keeping track of the epoch, we are protected from all potential reorderings.


private val isActive = new AtomicBoolean(false)

val epochForPartitionId = mutable.Map[Int, Int]()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a concurrent collection? It does not look like we can count on a lock protecting onElection and onResignation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does, thanks! Now fixed.

@guozhangwang

Copy link
Copy Markdown
Contributor

@tombentley could you see if @hachikuji 's comments can be addressed? This is a pretty tricky bug that I would like to get fixed in 3.0. Thanks!

@tombentley

Copy link
Copy Markdown
Member Author

@hachikuji I've addressed your comments, if you want to take another look?

groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded)
def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a CAS update? Otherwise I don't think this is safe.

groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, we should update epochForPartitionId here with a CAS operation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one doesn't do an update any more (following your other comment).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have probably not been doing a good job of being clear. It is useful to bump the epoch whenever we observe a larger value whether it is in onResignation or onElection. This protects us from all potential reorderings.

stopReplicaRequest.controllerEpoch,
stopReplicaRequest.brokerEpoch,
partitionStates,
onStopReplicas)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why we moved this in ReplicaManager. Is there some reason we need the replicaStateChangeLock lock?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji I think this is related to an earlier comment: #9441 (comment) The concern is that onResignation is called from two places: handleStopReplicaRequest and handleLeaderAndIsrRequest. The latter is protected by replicaStateChangeLock but the former is not, and hence race conditions may still happen.

The current approach seems to be going to a slight different way: instead of always trying to synchronize under replicaStateChangeLock, we just compare and swapping the epochForPartitionId, is that right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang yes, that's right. I forgot about our conversation about the lock when @hachikuji asked about why we were using the callback 😞.

I notice that the partitionLock is acquired by the addLoadingPartition call in loadGroupsAndOffsets, and is also acquired in removeGroupsAndOffsets. Wouldn't it be simpler to use that than replicaStateChangeLock at this point if we're wanting to avoid a third way of handling concurrency here, or is there some subtlety? Obviously we wouldn't hold it for the call to doLoadGroupsAndOffsets in loadGroupsAndOffsets, just for the two checks at the start

    if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, Some(coordinatorEpoch))) {
      info(s"Not loading offsets and group metadata for $topicPartition " +
        s"in epoch $coordinatorEpoch since current epoch is ${epochForPartitionId.get(topicPartition.partition)}")
    } else if (!addLoadingPartition(topicPartition.partition)) {
      info(s"Already loading offsets and group metadata from $topicPartition")
    }

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think partitionLock and replicaStateChangeLock are for different purposes here: the latter is specifically for changing the replica state including leader, ISR, while the former is for more general access patterns? @hachikuji could you chime in here if you got some time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the original issue concerned the potential reordering of loading/unloading events. This was possible because of inconsistent locking and the fact that we relied 100% on the order that the task was submitted to the scheduler. With this patch, we are now using the leader epoch in order to ensure that loading/unloading events are handled in the correct order. This means it does not actually matter if the events get submitted to the scheduler in the wrong order. Does that make sense or am I still missing something?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji as @guozhangwang pointed out, the scheduler really is FIFO (it uses a sequence number internally to guarantee order is maintained), so assuming his theory about the racing i/o threads is correct (and I think it is, but I've never observed this problem) then his solution of holding the lock for handleStopReplicaRequest would work.

The current version of the PR doesn't make assumptions about how any reordering can happen (i.e. whether caused by the inconsistent locking or anything else). So I don't think you're missing anything, you've just solved the problem differently.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hachikuji , I think using epoch would be sufficient too.

fooPartition -> Errors.NONE
), Errors.NONE)
)
//<<<<<<< HEAD

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix this?

@tombentley

Copy link
Copy Markdown
Member Author

Thanks @hachikuji, it's much simpler now, if you could take another look?

def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = {
epochForPartitionId.compute(offsetTopicPartitionId, (_, epoch) => {
val currentEpoch = Option(epoch)
if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One final thing I was considering is whether we should push this check into GroupMetadataManager.loadGroupsAndOffsets. That would give us some protection against any assumptions about ordering in KafkaScheduler.

groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = {
val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId))
if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have probably not been doing a good job of being clear. It is useful to bump the epoch whenever we observe a larger value whether it is in onResignation or onElection. This protects us from all potential reorderings.

@tombentley

Copy link
Copy Markdown
Member Author

@hachikuji Thanks, I was being slow on the uptake, but what you describe makes perfect sense. I've moved the logic to the GroupMetadataManager, am using the max observed epoch and have added some more tests. Very grateful if you could take another look.

One thing I found awkward was using ConcurrentHashMap#compute with the possibility of returning null (in the case of stop replicas it makes sense to remove the mapping, right?). I had to switch the java.lang.Integer type args, but maybe you know a better way of handling that?

@hachikuji

Copy link
Copy Markdown
Contributor

@tombentley Thanks for the updates. I made a few small changes in this patch: hachikuji@27ba937. The main things are taking the loading itself out of compute (which is intended to be a cheap operation) and moving the loadingPartitions check. Feel free to pull in the changes if they make sense to you.

@tombentley

Copy link
Copy Markdown
Member Author

@hachikuji that's much better, thank you. I added some logging where we ignore on the remove path, hope that's OK.

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is looking good. I left one more small suggestion. Also, it looks like the test GroupCoordinatorTest.testLeaderElectBeforeResign is failing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor improvement here is to change addLoadingPartition so that it checks whether the partition is already contained in ownedPartitions. If so, we can return false.

@tombentley

Copy link
Copy Markdown
Member Author

@hachikuji I fixed the tests and made you suggested changes, but what do you think about @guozhangwang's point above?

@hachikuji

Copy link
Copy Markdown
Contributor

@tombentley Apologies for the delay. I was out on leave for the past month. I responded to Guozhang's comment. Let me know if it makes sense or not.

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley Thanks for the patience. LGTM. Looks like there is a trivial conflict to fix, but feel free to merge after it is resolved.

@guozhangwang

Copy link
Copy Markdown
Contributor

Made another pass on the patch. LGTM! I think we can merge after resolved the conflicts.

@tombentley tombentley force-pushed the KAFKA-10614-group-coordinator-election-resignation branch from 59e5396 to b42dbee Compare June 7, 2021 08:01
Reviewers: Jason Gustafson<jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Co-authored-by: Jason Gustafson<jason@confluent.io>
@tombentley tombentley force-pushed the KAFKA-10614-group-coordinator-election-resignation branch from b42dbee to b2dabf3 Compare June 7, 2021 08:02
@tombentley tombentley merged commit c72ce00 into apache:trunk Jun 7, 2021
@tombentley tombentley deleted the KAFKA-10614-group-coordinator-election-resignation branch June 7, 2021 13:19
@tombentley

Copy link
Copy Markdown
Member Author

Thanks for the reviews @hachikuji @guozhangwang.

@guozhangwang

Copy link
Copy Markdown
Contributor

Thank YOU for the great patience @tombentley (it lasts for more than 6 months..) !

cadonna pushed a commit that referenced this pull request Jul 13, 2021
…er (#9441)

Co-authored-by: Jason Gustafson<jason@confluent.io>
Reviewers: Jason Gustafson<jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
@cadonna

cadonna commented Jul 13, 2021

Copy link
Copy Markdown
Member

Cherry-picked to 2.8

BewareMyPower added a commit to BewareMyPower/kop that referenced this pull request Sep 19, 2022
…ement

Fixes streamnative#1502

### Motivation

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

### Modifications

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.
Demogorgon314 pushed a commit to streamnative/kop that referenced this pull request Sep 22, 2022
…ement (#1504)

Fixes #1502

### Motivation

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

### Modifications

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.
Demogorgon314 pushed a commit to streamnative/kop that referenced this pull request Sep 22, 2022
…ement (#1504)

Fixes #1502

### Motivation

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

### Modifications

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.

(cherry picked from commit 615efc3)
Demogorgon314 pushed a commit to streamnative/kop that referenced this pull request Sep 28, 2022
…ement (#1504)

Fixes #1502

### Motivation

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

### Modifications

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.

(cherry picked from commit 615efc3)
eolivelli pushed a commit to datastax/starlight-for-kafka that referenced this pull request Sep 29, 2022
…ement (#1504)

Fixes #1502

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.

(cherry picked from commit 615efc3)
Demogorgon314 pushed a commit to streamnative/kop that referenced this pull request Feb 6, 2023
…ement (#1504)

Fixes #1502

### Motivation

Currently the `partitioner` in `GroupMetadataManager` can be customized
but it's only for test. The reason is that when
`testRequestHandlingWhileLoadingInProgress` was migrated from Kafka
source code, there is something wrong:
1. `otherGroupId` should be tested while it's actually `groupId`.
2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the
   error of hash code.

Therefore, a customized partitioner is used in `GroupCoordinatorTest`.

In addition, after apache/kafka#9441, the
partition of a group id won't be put into `loadPartitions` if it already
exists in `ownedPartitions`.

### Modifications

- Don't add partition into `loadPartitions` when it exists in
  `ownedPartitions`.
- Use the fixed partitioner to calculate the partition of a group id.
  Though the algorithm is still a little different from Kafka.
- Add the partition to `ownedPartitions` even if loading offsets failed.
- Add a separate method `removeGroupsAndOffsets` to remove caches and
  close the producer and reader of a partition. In this method, use
  try-finally block for lock instead of `inLock` to avoid usage of
  `AtomicInteger` in a lambda.
- Fix the wrong tests.

(cherry picked from commit 615efc3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants