KAFKA-7641: Introduce "group.max.size" config to limit group sizes#6163
Conversation
gwenshap
left a comment
There was a problem hiding this comment.
Left quite a few comments. Two high level things that worry me:
- I had lots of unrelated consumer test failures with this PR, leading me to guess that something about the logic isn't robust yet.
- Shrinking seems like the trickiest bit, both to implement and to test. I think we don't really need - if a group is really so big that it causes issues, it will eventually timeout and rebalance... and then we'll enforce the new size with "join group". WDYT?
If we do keep shrinking, I left some suggestions for simpler implementation. But I'm not confident that I understand the group reload logic enough, so asked @hachikuji to weigh in.
There was a problem hiding this comment.
Should this be Retriable? in theory, others could leave the group while we are retrying? Not 100% sure if we want any retriable exception in join group, which is time sensitive.
cc @hachikuji for second opinion :)
There was a problem hiding this comment.
In the KIP discussions we settled on fatally failing a consumer that encounters this error.
Does the consumer retry Retriable exceptions up to a certain point? I'm not sure. If we could retry up to 3 times and then fatally fail, that would be somewhat better
There was a problem hiding this comment.
I think it probably should be fatal. If a user is bumping up against the limit, we want them to know about it. Not sure there is much benefit trying to operate too finely at the edge.
There was a problem hiding this comment.
nit: Can you configure your IDE not to reorder imports?
There was a problem hiding this comment.
Sure, reverted these back
There was a problem hiding this comment.
Any reason this isn't in GroupConfig?
There was a problem hiding this comment.
Completely missed it, my bad
There was a problem hiding this comment.
few style nits:
- the if-else style here is not consistent
- use of NoMemberId vs JoinGroupRequest.UNKNOWN_MEMBER_ID. I'm not sure why we have both, but we probably want to be consistent with the other two places where we return an error here.
- Do we really need the "return" here? Scala style recommends against multiple exit points.
There was a problem hiding this comment.
re: NoMemberId, I had seen it from the removeMemberAndUpdateGroup. I also think it should use JoinGroupRequest.UNKNOWN_MEMBER_ID, especially now that the lines are right next to each other after the rebase
Agreed with the style, it reads better as an else if for sure
There was a problem hiding this comment.
Do we really need to shrink? If we detect a size issue and force a rebalance, everyone is forced to re-join and then the "group too large" logic in the JoinGroup path will kick in.
Personally, I'd leave existing groups alone. If the size causes issues, they'll timeout and rebalance themselves anyway. And if there are no issues, why introduce new issues.
@hachikuji should probably check the logic here, since reloading of groups has a bunch of logic (including transactions) that I'm not comfortable with.
There was a problem hiding this comment.
Yeah, this is the trickiest part of the KIP. My naive initial reasoning was that we might want to maintain the same leader but now that I think of it that is more or less pointless (unless it loads some state in its initial PartitionAssignor) and we most likely will pick another leader from the subsequent rebalance
There was a problem hiding this comment.
It's not a bad idea to preserve the leader, but I think Gwen's suggestion seems simpler. I think the main thing I was hoping is that the resizing could be graceful. The existing members should be able to commit their offsets before getting kicked out. It seems like we don't get that with this approach since we are forcefully removing some members.
There was a problem hiding this comment.
By kicking out the extra members, like Jason said those members lose their opportunities to commit the last batch. However on the other hand, this should only happen when the server operator believes that an absurd group size is configured that should be prevented. In this sense, the group is interpreted as over capacity, so allowing rest of consumers take over the jobs should be very light lifting, which I don't see an obvious negative impact. Thoughts?
There was a problem hiding this comment.
In this sense, the group is interpreted as over capacity, so allowing rest of consumers take over the jobs should be very light lifting
Sorry, I couldn't understand this exactly. Do you mean that it's reasonable to "shrink" the group in one way or another and not let it continue over capacity?
My thinking is to go with a forced rebalance
There was a problem hiding this comment.
do we really want to throw? Rather than just silently do nothing?
There was a problem hiding this comment.
I don't have a strong opinion about this
There was a problem hiding this comment.
Perhaps a log message (warning) would suffice?
There was a problem hiding this comment.
Should we just use assert here? As long as the change is robust, this should never happen which makes me believe adding assertion will be a better approach to avoid future changes affecting group max size logic.
There was a problem hiding this comment.
The argument name maxSize suggests this should be a no-op if the group is already smaller. I'd just remove the check.
There was a problem hiding this comment.
I'm worried about iterating the entire member map and not sure it is necessary.
I think we can assume that the number of members we remove is much smaller than the total number of members (i.e. the group is just slightly oversized).
Maybe this is cleaner:
- calculate how many members to remove.
- find N members out of members.keys
- call remove() on each (thereby not rewriting lots of logic, not moving around lots of stuff in memory, etc).
WDYT?
There was a problem hiding this comment.
Yeah this is pretty inefficient.
My thinking is that we should run through this code path only on broker rolls with a new config. But regarding your other comment, shrinking may turn out to be completely unnecessary. I'll wait for @hachikuji to give his thoughts as well before removing it.
There was a problem hiding this comment.
I guess this part of the logic is to add as many as members while making sure the leader is successfully joining the newMembers group. How about we just put the leader into the newMembers list first and continue the iteration without worrying about it? Note that double putting of leader shouldn't be a problem. This should make the logic simpler here as I perceived.
There was a problem hiding this comment.
I agree that @gwenshap's idea is more efficient. Otherwise, if we are sticking to the existing logic, we might be able to leverage the retain(...) function and avoid creating a second hash map.
There was a problem hiding this comment.
retain() is a cool function but I will go with @gwenshap's suggestion as it makes the updating of supportedProtocols easier
There was a problem hiding this comment.
super nit: "maximum number of..." amount usually refers to things that aren't counted individually.
There was a problem hiding this comment.
English! :)
There was a problem hiding this comment.
I'm not sure I understand what the changes to receive records are doing here...
There was a problem hiding this comment.
receiveRecords used to accept a topic parameter which it did not use at all, so I removed it
There was a problem hiding this comment.
If you're asking in a more general sense - I added a way to handle exceptions in receiveRecords through a callback. This was to ease implementation since I needed to run receiveRecords() in parallel. I played a lot with these tests, now I clearly see that callback logic isn't needed.
There was a problem hiding this comment.
seems to be consistently failing for me with:
java.util.concurrent.TimeoutException: Futures timed out after [1 millisecond]
There was a problem hiding this comment.
I do not see any failures. Passed 100/100.
There was a problem hiding this comment.
This is now failing for me too.
|
Hmm, the rebase with KIP-394 caused the test failures. I'll need to investigate further |
There was a problem hiding this comment.
Aha, this is why the tests are failing - left over from the merge with KIP-394 ;)
abbccdda
left a comment
There was a problem hiding this comment.
Great job Stanislav! The unit tests created are well thought and make me confident about the changes here. Left some comments.
There was a problem hiding this comment.
Would this be easier if we name a constructor as GroupMaxSizeReachedException(String groupId)?
There was a problem hiding this comment.
I also like what @abbccdda suggests (even though most of the existing exception constructors just take a message string).
There was a problem hiding this comment.
I also wonder if the message ... at full capacity ... provides enough context to users who may not be aware of this new feature. I think a clearer message that emphasizes on number of consumers in the group already at the configured maximum would be less confusing.
There was a problem hiding this comment.
I also wonder if the message ... at full capacity ... provides enough context to users who may not be aware of this new feature. I think a clearer message that emphasizes on number of consumers in the group already at the configured maximum would be less confusing.
This sounds good to me.
I was thinking about a groupId as a parameter as well and was wondering how to configure it in Errors.java for when the server uses that. I've updated that, please see it again.
There was a problem hiding this comment.
Should we just use assert here? As long as the change is robust, this should never happen which makes me believe adding assertion will be a better approach to avoid future changes affecting group max size logic.
There was a problem hiding this comment.
Could we just do a two-element (memberId, member) in the foreach iteration to avoid extra definition here?
There was a problem hiding this comment.
Since leaderWasAdded is only used twice, we might just want to use !leader.isEmpty for straightforward logic lookup.
There was a problem hiding this comment.
Just want to understand here: (I'm not 100% sure) does a plain assignment for members = newMembers affect the final result?
There was a problem hiding this comment.
I guess this part of the logic is to add as many as members while making sure the leader is successfully joining the newMembers group. How about we just put the leader into the newMembers list first and continue the iteration without worrying about it? Note that double putting of leader shouldn't be a problem. This should make the logic simpler here as I perceived.
There was a problem hiding this comment.
Do we need to create a separate function? Since checkExceptionDuringRebalance is used only once.
There was a problem hiding this comment.
I initially had it separate as it had a lot of lines of code but now that it's less than 10 lines I think it's a good idea to merge it back in
There was a problem hiding this comment.
By kicking out the extra members, like Jason said those members lose their opportunities to commit the last batch. However on the other hand, this should only happen when the server operator believes that an absurd group size is configured that should be prevented. In this sense, the group is interpreted as over capacity, so allowing rest of consumers take over the jobs should be very light lifting, which I don't see an obvious negative impact. Thoughts?
vahidhashemian
left a comment
There was a problem hiding this comment.
Thanks @stanislavkozlovski for the PR. I left a few comments.
There was a problem hiding this comment.
Perhaps a log message (warning) would suffice?
There was a problem hiding this comment.
This test fails for me about 50% of the time when I run it locally. I wonder if you see this flakiness on your side.
Sample error messages:
java.lang.AssertionError: expected:<102> but was:<105>
org.scalatest.junit.JUnitTestFailedError: Expected to only receive one exception of type
class org.apache.kafka.common.errors.GroupMaxSizeReachedExceptionduring consumption.
Received: ArrayBuffer(org.apache.kafka.clients.consumer.CommitFailedException: Commit
cannot be completed since the group has already rebalanced and assigned the partitions to
another member. This means that the time between subsequent calls to poll() was longer
than the configured max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either by increasing
max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with
max.poll.records.)
java.lang.AssertionError: expected:<102> but was:<136>
There was a problem hiding this comment.
Not 50% of the time but I do see some flakiness now. What do you think would be a good way to go around this? Checking for at least X records rather than exact?
There was a problem hiding this comment.
After the changes, this test has passed 50/50 times for me
There was a problem hiding this comment.
Yes, it now passes for me too (20/20). Thanks!
There was a problem hiding this comment.
I also like what @abbccdda suggests (even though most of the existing exception constructors just take a message string).
There was a problem hiding this comment.
I also wonder if the message ... at full capacity ... provides enough context to users who may not be aware of this new feature. I think a clearer message that emphasizes on number of consumers in the group already at the configured maximum would be less confusing.
There was a problem hiding this comment.
I agree that @gwenshap's idea is more efficient. Otherwise, if we are sticking to the existing logic, we might be able to leverage the retain(...) function and avoid creating a second hash map.
There was a problem hiding this comment.
I do not see any failures. Passed 100/100.
|
I addressed some of the style comments with the latest commit. I will now work on removing the shrinking logic by forcing a rebalance instead. I feel that might be the better and simpler approach |
|
Okay I have revisited the group migration and In the way I have written it, I believe there may exist a very slight race condition in between the response for We also saw the last build fail with the test flakiness that @vahidhashemian pointed out. I have changed the test to assert that at least X records have been consumed. It is very peculiar |
vahidhashemian
left a comment
There was a problem hiding this comment.
Thanks for the updates. It seems to me the latest commit has broken some of the unit tests (while fixing the flaky one). Without that commit all unit tests seem to pass for me.
There was a problem hiding this comment.
nit: ... as many members, or even ... already has the configured maximum number of members.
There was a problem hiding this comment.
It seems this test doesn't properly handle the failure.
There was a problem hiding this comment.
This is now failing for me too.
There was a problem hiding this comment.
Yes, it now passes for me too (20/20). Thanks!
|
@vahidhashemian The latest commit should fix everything - the tests pass fine locally |
abbccdda
left a comment
There was a problem hiding this comment.
Thanks for the updates. Left more comments.
There was a problem hiding this comment.
Since this is non-retriable exception, we could consolidate it into code block L546 as fatal error.
There was a problem hiding this comment.
We don't need to check anything here I guess, we should just need to attempt removing the member id (because no member.id match will do a no-op IMO.
There was a problem hiding this comment.
I think we just need to kick out real members until the groupIsFull is met. Probably we could just use groupIsFull to keep rejecting new members, while doesn't care about pending members to make the handling logic simpler.
There was a problem hiding this comment.
I don't think that would work.
If we were to detect that groupIsOverCapacity() and start kicking out members until groupIsFull, we might kick out members before they had a chance to commit offsets.
When we receive aJoinGroup request from a member, we know that it has attempted to commit offsets. (see ConsumerCoordinator#onJoinPrepare)
|
Grr, we saw JDK8 fail with the new test: Meaning it can be somewhat flaky... Does anybody have any recommendations on how to reduce the flakiness here? We could catch such errors and retry at least once |
There was a problem hiding this comment.
If we have to handle this separately, maybe we could move the check to the top level like the GROUP_AUTHORIZATION check.
There was a problem hiding this comment.
I think it would be good to have the log
log.error("Attempt to join group failed due to fatal error: {}", error.message());
Maybe we could move the GROUP_AUTHORIZATION to this level?
There was a problem hiding this comment.
nit: this seems a verbose way of saying group.size >= groupConfig.groupMaxSize
There was a problem hiding this comment.
Maybe, I thought it reads better in the if check. I've removed this.
WDYT about groupIsOverCapacity?
There was a problem hiding this comment.
Technically this JoinGroup could be a retry from a member that was already accepted. One way to detect this case would be to check the member's awaitingJoinCallback.
There was a problem hiding this comment.
Great catch
abbccdda
left a comment
There was a problem hiding this comment.
Great work so far! The algorithm part LGTM now. One meta comment was that do we have logic to check whether GROUP_MAX_SIZE_REACHED is a fatal exception? (Hint from you in KIP-345 lol)
|
@abbccdda @hachikuji I think I've addressed your comments. |
|
Thanks a lot for the updates! @stanislavkozlovski The implementation LGTM for now. |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks, looks good overall. I left small comments on the test cases.
There was a problem hiding this comment.
nit: this seems a little brittle. Would it be enough to just check the exception type?
There was a problem hiding this comment.
nit: since it's just an accessor, maybe we could drop the parenthesis?
There was a problem hiding this comment.
nit: usually we favor
stableConsumers.foreach { cons =>There was a problem hiding this comment.
Could we assert the expected exception?
There was a problem hiding this comment.
nit: do we need to add new code using the deprecated poll api?
There was a problem hiding this comment.
We have similar logic in PlaintextConsumerTest.subscribePollers. Not sure how easy it is to factor out the common logic, but it would be preferable if possible.
There was a problem hiding this comment.
Maybe we could call this receiveAndCommit or something so that the offset commit expectation is clear.
There was a problem hiding this comment.
Seems we might be able to simplify this. If we receive anything other than GroupMaxSizeReachedException, couldn't we fail directly? Then it could be a simple AtomicBoolean tracking whether or not we have received exactly one exception.
…mer group sizes This patch introduces a new config - "consumer.group.max.size", which caps the maximum size any consumer group can reach. It has a default value of Int.MAX_VALUE. Once a consumer group is of the maximum size, subsequent JoinGroup requests receive a MAX_SIZE_REACHED error. In the case where the config is changed and a Coordinator broker with the new config loads an old group that is over the threshold, members are kicked out of the group and a rebalance is forced.
The Coordinator now triggers a rebalance when it encounters an over-sized group. This gives a chance for the group members to commit offsets. Once the rebalance collects all the `JoinGroup` requests, the Coordinator shrinks the group, triggers another rebalance and responds to the JoinGroup requests to unblock consumers.
… dismiss some JoinGroup requests
…roup is over capacity Add test to ensure Errors.GROUP_MAX_SIZE_REACHED isn't retriable Small refactorigs
hachikuji
left a comment
There was a problem hiding this comment.
LGTM. I will merge once the build completes (supposing no errors).
* AK/trunk: fix typo (apache#5150) MINOR: Reduce replica.fetch.backoff.ms in ReassignPartitionsClusterTest (apache#5887) KAFKA-7766: Fail fast PR builds (apache#6059) KAFKA-7798: Expose embedded clientIds (apache#6107) KAFKA-7641; Introduce "group.max.size" config to limit group sizes (apache#6163) KAFKA-7433; Introduce broker options in TopicCommand to use AdminClient (KIP-377) MINOR: Fix some field definitions for ListOffsetReponse (apache#6214) KAFKA-7873; Always seek to beginning in KafkaBasedLog (apache#6203) KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (apache#6022) MINOR: fix checkstyle suppressions for generated RPC code to work on Windows KAFKA-7859: Use automatic RPC generation in LeaveGroups (apache#6188) KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (apache#6161) KAFKA-3522: Add RocksDBTimestampedStore (apache#6149) KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (apache#6204)
…pache#6163) This patch introduces a new config - "group.max.size", which caps the maximum size any group can reach. It has a default value of Int.MAX_VALUE. Once a group is of the maximum size, subsequent JoinGroup requests receive a MAX_SIZE_REACHED error. In the case where the config is changed and a Coordinator broker with the new config loads an old group that is over the threshold, members are kicked out of the group and a rebalance is forced. Reviewers: Vahid Hashemian <vahid.hashemian@gmail.com>, Boyang Chen <bchen11@outlook.com>, Gwen Shapira <cshapi@gmail.com>, Jason Gustafson <jason@confluent.io>
This patch introduces a new config - "consumer.group.max.size", which caps the maximum size any consumer group can reach. It has a default value of Int.MAX_VALUE.
Once a consumer group is of the maximum size, subsequent JoinGroup requests receive a MAX_SIZE_REACHED error.
In the case where the config is changed and a Coordinator broker with the new config loads an old group that is over the threshold, members are kicked out of the group and a rebalance is forced.
I have added two integration tests for both scenarios - a member joining an already-full group and a rolling restart with a new config
Committer Checklist (excluded from commit message)