KAFKA-10614: Ensure group state (un)load is executed in the submitted order#9441
Conversation
|
@guozhangwang this is a draft implementation of one of the solutions you suggested. I will try to add a test, but in the meantime any comments you have are welcome. |
|
@tombentley Thanks for the patch. I think the problem is in the callback inside
At least that's my understanding of the issue. I think we need to find a way to push the epoch validation into |
|
@hachikuji, yes I see that although replica manager might be synchronized it doesn't prevent the race. I was already working on introducing the epoch into the group coordinator (as the transaction coordinator already does). I'll rework the patch, thanks for the feedback. |
|
@tombentley Actually I missed the holding of the |
|
I assumed the scheduler would be using a heap to prioritize scheduled tasks, and that wouldn't preserve the order of submitted tasks with the same scheduled time. Since |
3d55321 to
09ccad0
Compare
|
@hachikuji I'm now passing in the coordinator epoch and ignoring attempts to unload state if the epoch is old than what's loaded. I've also added a test. One thing I didn't understand was why I needed to change |
Stupid me, that was because I turned up the logging. I'll revert that. |
|
@guozhangwang, @hachikuji I was thinking... solving it as I have now (keeping track of the epoch) doesn't address another potential problem case where the tenure as leader is very short and the background task to load the state runs after the background task to unload the state in the same epoch. Obviously in this case the load should not be done (I guess it could result in a broker not returning NOT_COORDINATOR when it should, based on incorrect state of |
|
Hey @tombentley Sorry for the late reply! I also checked the source code of So what's possibly happening is that, the thread handling an leaderISR request with the old controller epoch grabs the lock and proceeds first. In that case, maybe we can simplify your proposed solution as the following:
WDYT? |
|
You're right @guozhangwang, I updated the PR if you want to take another look, but I still need to address synchronization, since although |
Sort of :) I think it is possible that, two ISR request reaches the same broker from old and new controller, and they are handled by two different handler threads of the broker. The handler thread handling the old controller, may grab the lock first.
How about this: since which is currently triggered after the call, as passed in as a delta function parameter into that call, just like what I looked at your current PR, and I think that looks promising. If you agree my above suggestion and made the changes accordingly, I think we can turn this draft as an official PR and merge. |
07a9eb3 to
a7cde76
Compare
|
@guozhangwang that's a much better solution, thanks! I've implemented that and rebased for a conflict. |
a7cde76 to
f13bd30
Compare
|
Rebased for conflict. @guozhangwang @cadonna @hachikuji please could one of you take a look? |
|
Yes! I will review it again. Thanks for hanging on there and my apologies... Review has always been a bit overwhelming for me :) |
|
Thanks, and no worries about the wait. |
|
This change looks good to me. @hachikuji could you take another look before we merge? |
f13bd30 to
afab8f1
Compare
|
Rebased for conflict. Grateful if you could take a look @hachikuji. |
|
Hey @guozhangwang and @hachikuji , I just ran across this PR as part of the 2.8 release. It's not going to make 2.8, but we should go ahead and close the loop to get it fixed for 3.0. Thanks! |
hachikuji
left a comment
There was a problem hiding this comment.
Sorry for the long delay. The approach looks good. Left a couple comments.
| if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) { | ||
| info(s"Resigned as the group coordinator for partition $offsetTopicPartitionId in epoch $coordinatorEpoch") | ||
| groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) | ||
| epochForPartitionId.remove(offsetTopicPartitionId) |
There was a problem hiding this comment.
Hmm.. Why remove the epoch after resignation? It seems like it would be useful to keep tracking it. Maybe it's useful to distinguish the case where the replica is to be deleted?
There was a problem hiding this comment.
I guess mostly I did it for symmetry with onElection, however you're right that is doesn't affect correctness. I don't entirely follow your point about how we could use this when the replica is being deleted though. Could you explain?
There was a problem hiding this comment.
The onResignation hook just means we lost leadership. By keeping track of the epoch, we are protected from all potential reorderings.
|
|
||
| private val isActive = new AtomicBoolean(false) | ||
|
|
||
| val epochForPartitionId = mutable.Map[Int, Int]() |
There was a problem hiding this comment.
Does this need to be a concurrent collection? It does not look like we can count on a lock protecting onElection and onResignation.
There was a problem hiding this comment.
Yes it does, thanks! Now fixed.
|
@tombentley could you see if @hachikuji 's comments can be addressed? This is a pretty tricky bug that I would like to get fixed in 3.0. Thanks! |
|
@hachikuji I've addressed your comments, if you want to take another look? |
| groupManager.scheduleLoadGroupAndOffsets(offsetTopicPartitionId, onGroupLoaded) | ||
| def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = { | ||
| val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId)) | ||
| if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) { |
There was a problem hiding this comment.
Can we do a CAS update? Otherwise I don't think this is safe.
| groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) | ||
| def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = { | ||
| val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId)) | ||
| if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) { |
There was a problem hiding this comment.
Similarly, we should update epochForPartitionId here with a CAS operation.
There was a problem hiding this comment.
This one doesn't do an update any more (following your other comment).
There was a problem hiding this comment.
I have probably not been doing a good job of being clear. It is useful to bump the epoch whenever we observe a larger value whether it is in onResignation or onElection. This protects us from all potential reorderings.
| stopReplicaRequest.controllerEpoch, | ||
| stopReplicaRequest.brokerEpoch, | ||
| partitionStates, | ||
| onStopReplicas) |
There was a problem hiding this comment.
It's not clear to me why we moved this in ReplicaManager. Is there some reason we need the replicaStateChangeLock lock?
There was a problem hiding this comment.
@hachikuji I think this is related to an earlier comment: #9441 (comment) The concern is that onResignation is called from two places: handleStopReplicaRequest and handleLeaderAndIsrRequest. The latter is protected by replicaStateChangeLock but the former is not, and hence race conditions may still happen.
The current approach seems to be going to a slight different way: instead of always trying to synchronize under replicaStateChangeLock, we just compare and swapping the epochForPartitionId, is that right?
There was a problem hiding this comment.
@guozhangwang yes, that's right. I forgot about our conversation about the lock when @hachikuji asked about why we were using the callback 😞.
I notice that the partitionLock is acquired by the addLoadingPartition call in loadGroupsAndOffsets, and is also acquired in removeGroupsAndOffsets. Wouldn't it be simpler to use that than replicaStateChangeLock at this point if we're wanting to avoid a third way of handling concurrency here, or is there some subtlety? Obviously we wouldn't hold it for the call to doLoadGroupsAndOffsets in loadGroupsAndOffsets, just for the two checks at the start
if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, Some(coordinatorEpoch))) {
info(s"Not loading offsets and group metadata for $topicPartition " +
s"in epoch $coordinatorEpoch since current epoch is ${epochForPartitionId.get(topicPartition.partition)}")
} else if (!addLoadingPartition(topicPartition.partition)) {
info(s"Already loading offsets and group metadata from $topicPartition")
}
There was a problem hiding this comment.
I think partitionLock and replicaStateChangeLock are for different purposes here: the latter is specifically for changing the replica state including leader, ISR, while the former is for more general access patterns? @hachikuji could you chime in here if you got some time.
There was a problem hiding this comment.
If I understand correctly, the original issue concerned the potential reordering of loading/unloading events. This was possible because of inconsistent locking and the fact that we relied 100% on the order that the task was submitted to the scheduler. With this patch, we are now using the leader epoch in order to ensure that loading/unloading events are handled in the correct order. This means it does not actually matter if the events get submitted to the scheduler in the wrong order. Does that make sense or am I still missing something?
There was a problem hiding this comment.
@hachikuji as @guozhangwang pointed out, the scheduler really is FIFO (it uses a sequence number internally to guarantee order is maintained), so assuming his theory about the racing i/o threads is correct (and I think it is, but I've never observed this problem) then his solution of holding the lock for handleStopReplicaRequest would work.
The current version of the PR doesn't make assumptions about how any reordering can happen (i.e. whether caused by the inconsistent locking or anything else). So I don't think you're missing anything, you've just solved the problem differently.
There was a problem hiding this comment.
Thanks @hachikuji , I think using epoch would be sufficient too.
| fooPartition -> Errors.NONE | ||
| ), Errors.NONE) | ||
| ) | ||
| //<<<<<<< HEAD |
|
Thanks @hachikuji, it's much simpler now, if you could take another look? |
| def onElection(offsetTopicPartitionId: Int, coordinatorEpoch: Int): Unit = { | ||
| epochForPartitionId.compute(offsetTopicPartitionId, (_, epoch) => { | ||
| val currentEpoch = Option(epoch) | ||
| if (currentEpoch.forall(currentEpoch => coordinatorEpoch > currentEpoch)) { |
There was a problem hiding this comment.
One final thing I was considering is whether we should push this check into GroupMetadataManager.loadGroupsAndOffsets. That would give us some protection against any assumptions about ordering in KafkaScheduler.
| groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) | ||
| def onResignation(offsetTopicPartitionId: Int, coordinatorEpoch: Option[Int]): Unit = { | ||
| val currentEpoch = Option(epochForPartitionId.get(offsetTopicPartitionId)) | ||
| if (currentEpoch.forall(currentEpoch => currentEpoch <= coordinatorEpoch.getOrElse(Int.MaxValue))) { |
There was a problem hiding this comment.
I have probably not been doing a good job of being clear. It is useful to bump the epoch whenever we observe a larger value whether it is in onResignation or onElection. This protects us from all potential reorderings.
|
@hachikuji Thanks, I was being slow on the uptake, but what you describe makes perfect sense. I've moved the logic to the GroupMetadataManager, am using the max observed epoch and have added some more tests. Very grateful if you could take another look. One thing I found awkward was using |
|
@tombentley Thanks for the updates. I made a few small changes in this patch: hachikuji@27ba937. The main things are taking the loading itself out of |
|
@hachikuji that's much better, thank you. I added some logging where we ignore on the remove path, hope that's OK. |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, this is looking good. I left one more small suggestion. Also, it looks like the test GroupCoordinatorTest.testLeaderElectBeforeResign is failing.
There was a problem hiding this comment.
One minor improvement here is to change addLoadingPartition so that it checks whether the partition is already contained in ownedPartitions. If so, we can return false.
|
@hachikuji I fixed the tests and made you suggested changes, but what do you think about @guozhangwang's point above? |
|
@tombentley Apologies for the delay. I was out on leave for the past month. I responded to Guozhang's comment. Let me know if it makes sense or not. |
hachikuji
left a comment
There was a problem hiding this comment.
@tombentley Thanks for the patience. LGTM. Looks like there is a trivial conflict to fix, but feel free to merge after it is resolved.
|
Made another pass on the patch. LGTM! I think we can merge after resolved the conflicts. |
59e5396 to
b42dbee
Compare
Reviewers: Jason Gustafson<jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com> Co-authored-by: Jason Gustafson<jason@confluent.io>
b42dbee to
b2dabf3
Compare
|
Thanks for the reviews @hachikuji @guozhangwang. |
|
Thank YOU for the great patience @tombentley (it lasts for more than 6 months..) ! |
…er (#9441) Co-authored-by: Jason Gustafson<jason@confluent.io> Reviewers: Jason Gustafson<jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
|
Cherry-picked to 2.8 |
…ement Fixes streamnative#1502 ### Motivation Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. ### Modifications - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests.
…ement (#1504) Fixes #1502 ### Motivation Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. ### Modifications - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests.
…ement (#1504) Fixes #1502 ### Motivation Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. ### Modifications - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests. (cherry picked from commit 615efc3)
…ement (#1504) Fixes #1502 ### Motivation Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. ### Modifications - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests. (cherry picked from commit 615efc3)
…ement (#1504) Fixes #1502 Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests. (cherry picked from commit 615efc3)
…ement (#1504) Fixes #1502 ### Motivation Currently the `partitioner` in `GroupMetadataManager` can be customized but it's only for test. The reason is that when `testRequestHandlingWhileLoadingInProgress` was migrated from Kafka source code, there is something wrong: 1. `otherGroupId` should be tested while it's actually `groupId`. 2. `otherGroupId` should be "myGroup", not "myGroupId", which cause the error of hash code. Therefore, a customized partitioner is used in `GroupCoordinatorTest`. In addition, after apache/kafka#9441, the partition of a group id won't be put into `loadPartitions` if it already exists in `ownedPartitions`. ### Modifications - Don't add partition into `loadPartitions` when it exists in `ownedPartitions`. - Use the fixed partitioner to calculate the partition of a group id. Though the algorithm is still a little different from Kafka. - Add the partition to `ownedPartitions` even if loading offsets failed. - Add a separate method `removeGroupsAndOffsets` to remove caches and close the producer and reader of a partition. In this method, use try-finally block for lock instead of `inLock` to avoid usage of `AtomicInteger` in a lambda. - Fix the wrong tests. (cherry picked from commit 615efc3)
Implements the single thread with FIFO approach suggested in https://issues.apache.org/jira/browse/KAFKA-10614