Skip to content

KAFKA-7641: Introduce "group.max.size" config to limit group sizes#6163

Merged
hachikuji merged 16 commits into
apache:trunkfrom
stanislavkozlovski:KIP-389-max-consumer-group-size
Feb 2, 2019
Merged

KAFKA-7641: Introduce "group.max.size" config to limit group sizes#6163
hachikuji merged 16 commits into
apache:trunkfrom
stanislavkozlovski:KIP-389-max-consumer-group-size

Conversation

@stanislavkozlovski

Copy link
Copy Markdown
Contributor

This patch introduces a new config - "consumer.group.max.size", which caps the maximum size any consumer group can reach. It has a default value of Int.MAX_VALUE.
Once a consumer group is of the maximum size, subsequent JoinGroup requests receive a MAX_SIZE_REACHED error.

In the case where the config is changed and a Coordinator broker with the new config loads an old group that is over the threshold, members are kicked out of the group and a rebalance is forced.

I have added two integration tests for both scenarios - a member joining an already-full group and a rolling restart with a new config

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@gwenshap gwenshap left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left quite a few comments. Two high level things that worry me:

  1. I had lots of unrelated consumer test failures with this PR, leading me to guess that something about the logic isn't robust yet.
  2. Shrinking seems like the trickiest bit, both to implement and to test. I think we don't really need - if a group is really so big that it causes issues, it will eventually timeout and rebalance... and then we'll enforce the new size with "join group". WDYT?
    If we do keep shrinking, I left some suggestions for simpler implementation. But I'm not confident that I understand the group reload logic enough, so asked @hachikuji to weigh in.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Retriable? in theory, others could leave the group while we are retrying? Not 100% sure if we want any retriable exception in join group, which is time sensitive.

cc @hachikuji for second opinion :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the KIP discussions we settled on fatally failing a consumer that encounters this error.

Does the consumer retry Retriable exceptions up to a certain point? I'm not sure. If we could retry up to 3 times and then fatally fail, that would be somewhat better

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it probably should be fatal. If a user is bumping up against the limit, we want them to know about it. Not sure there is much benefit trying to operate too finely at the edge.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you configure your IDE not to reorder imports?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, reverted these back

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this isn't in GroupConfig?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely missed it, my bad

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few style nits:

  • the if-else style here is not consistent
  • use of NoMemberId vs JoinGroupRequest.UNKNOWN_MEMBER_ID. I'm not sure why we have both, but we probably want to be consistent with the other two places where we return an error here.
  • Do we really need the "return" here? Scala style recommends against multiple exit points.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: NoMemberId, I had seen it from the removeMemberAndUpdateGroup. I also think it should use JoinGroupRequest.UNKNOWN_MEMBER_ID, especially now that the lines are right next to each other after the rebase

Agreed with the style, it reads better as an else if for sure

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to shrink? If we detect a size issue and force a rebalance, everyone is forced to re-join and then the "group too large" logic in the JoinGroup path will kick in.

Personally, I'd leave existing groups alone. If the size causes issues, they'll timeout and rebalance themselves anyway. And if there are no issues, why introduce new issues.

@hachikuji should probably check the logic here, since reloading of groups has a bunch of logic (including transactions) that I'm not comfortable with.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is the trickiest part of the KIP. My naive initial reasoning was that we might want to maintain the same leader but now that I think of it that is more or less pointless (unless it loads some state in its initial PartitionAssignor) and we most likely will pick another leader from the subsequent rebalance

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a bad idea to preserve the leader, but I think Gwen's suggestion seems simpler. I think the main thing I was hoping is that the resizing could be graceful. The existing members should be able to commit their offsets before getting kicked out. It seems like we don't get that with this approach since we are forcefully removing some members.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By kicking out the extra members, like Jason said those members lose their opportunities to commit the last batch. However on the other hand, this should only happen when the server operator believes that an absurd group size is configured that should be prevented. In this sense, the group is interpreted as over capacity, so allowing rest of consumers take over the jobs should be very light lifting, which I don't see an obvious negative impact. Thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this sense, the group is interpreted as over capacity, so allowing rest of consumers take over the jobs should be very light lifting
Sorry, I couldn't understand this exactly. Do you mean that it's reasonable to "shrink" the group in one way or another and not let it continue over capacity?

My thinking is to go with a forced rebalance

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want to throw? Rather than just silently do nothing?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion about this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a log message (warning) would suffice?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use assert here? As long as the change is robust, this should never happen which makes me believe adding assertion will be a better approach to avoid future changes affecting group max size logic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument name maxSize suggests this should be a no-op if the group is already smaller. I'd just remove the check.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about iterating the entire member map and not sure it is necessary.
I think we can assume that the number of members we remove is much smaller than the total number of members (i.e. the group is just slightly oversized).

Maybe this is cleaner:

  • calculate how many members to remove.
  • find N members out of members.keys
  • call remove() on each (thereby not rewriting lots of logic, not moving around lots of stuff in memory, etc).

WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is pretty inefficient.
My thinking is that we should run through this code path only on broker rolls with a new config. But regarding your other comment, shrinking may turn out to be completely unnecessary. I'll wait for @hachikuji to give his thoughts as well before removing it.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this part of the logic is to add as many as members while making sure the leader is successfully joining the newMembers group. How about we just put the leader into the newMembers list first and continue the iteration without worrying about it? Note that double putting of leader shouldn't be a problem. This should make the logic simpler here as I perceived.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that @gwenshap's idea is more efficient. Otherwise, if we are sticking to the existing logic, we might be able to leverage the retain(...) function and avoid creating a second hash map.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retain() is a cool function but I will go with @gwenshap's suggestion as it makes the updating of supportedProtocols easier

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: "maximum number of..." amount usually refers to things that aren't counted individually.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

English! :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what the changes to receive records are doing here...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

receiveRecords used to accept a topic parameter which it did not use at all, so I removed it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're asking in a more general sense - I added a way to handle exceptions in receiveRecords through a callback. This was to ease implementation since I needed to run receiveRecords() in parallel. I played a lot with these tests, now I clearly see that callback logic isn't needed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems to be consistently failing for me with:
java.util.concurrent.TimeoutException: Futures timed out after [1 millisecond]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see any failures. Passed 100/100.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now failing for me too.

@stanislavkozlovski

Copy link
Copy Markdown
Contributor Author

Hmm, the rebase with KIP-394 caused the test failures. I'll need to investigate further

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, this is why the tests are failing - left over from the merge with KIP-394 ;)

@abbccdda abbccdda left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job Stanislav! The unit tests created are well thought and make me confident about the changes here. Left some comments.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be easier if we name a constructor as GroupMaxSizeReachedException(String groupId)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like what @abbccdda suggests (even though most of the existing exception constructors just take a message string).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if the message ... at full capacity ... provides enough context to users who may not be aware of this new feature. I think a clearer message that emphasizes on number of consumers in the group already at the configured maximum would be less confusing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if the message ... at full capacity ... provides enough context to users who may not be aware of this new feature. I think a clearer message that emphasizes on number of consumers in the group already at the configured maximum would be less confusing.

This sounds good to me.

I was thinking about a groupId as a parameter as well and was wondering how to configure it in Errors.java for when the server uses that. I've updated that, please see it again.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use assert here? As long as the change is robust, this should never happen which makes me believe adding assertion will be a better approach to avoid future changes affecting group max size logic.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just do a two-element (memberId, member) in the foreach iteration to avoid extra definition here?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since leaderWasAdded is only used twice, we might just want to use !leader.isEmpty for straightforward logic lookup.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to understand here: (I'm not 100% sure) does a plain assignment for members = newMembers affect the final result?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/maxSize/groupMaxSize

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this part of the logic is to add as many as members while making sure the leader is successfully joining the newMembers group. How about we just put the leader into the newMembers list first and continue the iteration without worrying about it? Note that double putting of leader shouldn't be a problem. This should make the logic simpler here as I perceived.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create a separate function? Since checkExceptionDuringRebalance is used only once.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially had it separate as it had a lot of lines of code but now that it's less than 10 lines I think it's a good idea to merge it back in

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By kicking out the extra members, like Jason said those members lose their opportunities to commit the last batch. However on the other hand, this should only happen when the server operator believes that an absurd group size is configured that should be prevented. In this sense, the group is interpreted as over capacity, so allowing rest of consumers take over the jobs should be very light lifting, which I don't see an obvious negative impact. Thoughts?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just remove the return type.

@vahidhashemian vahidhashemian left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @stanislavkozlovski for the PR. I left a few comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a log message (warning) would suffice?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails for me about 50% of the time when I run it locally. I wonder if you see this flakiness on your side.
Sample error messages:

java.lang.AssertionError: expected:<102> but was:<105>

org.scalatest.junit.JUnitTestFailedError: Expected to only receive one exception of type
class org.apache.kafka.common.errors.GroupMaxSizeReachedExceptionduring consumption.
Received: ArrayBuffer(org.apache.kafka.clients.consumer.CommitFailedException: Commit
cannot be completed since the group has already rebalanced and assigned the partitions to
another member. This means that the time between subsequent calls to poll() was longer
than the configured max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either by increasing
max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with
max.poll.records.)

java.lang.AssertionError: expected:<102> but was:<136>

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 50% of the time but I do see some flakiness now. What do you think would be a good way to go around this? Checking for at least X records rather than exact?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the changes, this test has passed 50/50 times for me

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it now passes for me too (20/20). Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like what @abbccdda suggests (even though most of the existing exception constructors just take a message string).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if the message ... at full capacity ... provides enough context to users who may not be aware of this new feature. I think a clearer message that emphasizes on number of consumers in the group already at the configured maximum would be less confusing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that @gwenshap's idea is more efficient. Otherwise, if we are sticking to the existing logic, we might be able to leverage the retain(...) function and avoid creating a second hash map.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see any failures. Passed 100/100.

@stanislavkozlovski

Copy link
Copy Markdown
Contributor Author

I addressed some of the style comments with the latest commit. I will now work on removing the shrinking logic by forcing a rebalance instead. I feel that might be the better and simpler approach

@stanislavkozlovski

Copy link
Copy Markdown
Contributor Author

Okay I have revisited the group migration and shrinkTo logic - upon loading a big group, the coordinator triggers a rebalance. This gives a chance for the consumers to commit offsets while the Coordinator is collecting JoinGroup requests.

In the way I have written it, I believe there may exist a very slight race condition in between the response for JoinGroup and the removal from the group.
I wanted to first shrink the group, then trigger a rebalance and in the end respond to JoinGroup but I could get the tests to pass with that combination after banging my head against it.

We also saw the last build fail with the test flakiness that @vahidhashemian pointed out. I have changed the test to assert that at least X records have been consumed. It is very peculiar

@vahidhashemian vahidhashemian left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. It seems to me the latest commit has broken some of the unit tests (while fixing the flaky one). Without that commit all unit tests seem to pass for me.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ... as many members, or even ... already has the configured maximum number of members.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this test doesn't properly handle the failure.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now failing for me too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it now passes for me too (20/20). Thanks!

@stanislavkozlovski

Copy link
Copy Markdown
Contributor Author

@vahidhashemian
Good thing the tests fail - they caught a hole in the logic. We would not block a JoinGroup in the scenario where the group is full and a new member wants to join.

The latest commit should fix everything - the tests pass fine locally

@abbccdda abbccdda left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Left more comments.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is non-retriable exception, we could consolidate it into code block L546 as fatal error.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to check anything here I guess, we should just need to attempt removing the member id (because no member.id match will do a no-op IMO.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we just need to kick out real members until the groupIsFull is met. Probably we could just use groupIsFull to keep rejecting new members, while doesn't care about pending members to make the handling logic simpler.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would work.
If we were to detect that groupIsOverCapacity() and start kicking out members until groupIsFull, we might kick out members before they had a chance to commit offsets.

When we receive aJoinGroup request from a member, we know that it has attempted to commit offsets. (see ConsumerCoordinator#onJoinPrepare)

@stanislavkozlovski

Copy link
Copy Markdown
Contributor Author

Grr, we saw JDK8 fail with the new test:

12:19:07 kafka.api.ConsumerBounceTest > testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup FAILED
12:19:07     java.lang.AssertionError: Received 0, expected at least 102
12:19:07         at org.junit.Assert.fail(Assert.java:88)
12:19:07         at org.junit.Assert.assertTrue(Assert.java:41)
12:19:07         at kafka.api.ConsumerBounceTest.kafka$api$ConsumerBounceTest$$receiveAtLeastRecords(ConsumerBounceTest.scala:554)
12:19:07         at kafka.api.ConsumerBounceTest$$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$3.apply(ConsumerBounceTest.scala:378)
12:19:07         at kafka.api.ConsumerBounceTest$$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$3.apply(ConsumerBounceTest.scala:377)
12:19:07         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
12:19:07         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
12:19:07         at kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:377)
12:19:07 

Meaning it can be somewhat flaky... Does anybody have any recommendations on how to reduce the flakiness here? We could catch such errors and retry at least once

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have to handle this separately, maybe we could move the check to the top level like the GROUP_AUTHORIZATION check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to have the log

log.error("Attempt to join group failed due to fatal error: {}", error.message());

Maybe we could move the GROUP_AUTHORIZATION to this level?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this seems a verbose way of saying group.size >= groupConfig.groupMaxSize

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I thought it reads better in the if check. I've removed this.
WDYT about groupIsOverCapacity?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically this JoinGroup could be a retry from a member that was already accepted. One way to detect this case would be to check the member's awaitingJoinCallback.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch

@abbccdda abbccdda left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work so far! The algorithm part LGTM now. One meta comment was that do we have logic to check whether GROUP_MAX_SIZE_REACHED is a fatal exception? (Hint from you in KIP-345 lol)

@stanislavkozlovski

Copy link
Copy Markdown
Contributor Author

@abbccdda @hachikuji I think I've addressed your comments.
I'm interested if you have any suggestions on how to reduce the flakiness of integration tests in ConsumerBounceTest.scala

@abbccdda

Copy link
Copy Markdown

Thanks a lot for the updates! @stanislavkozlovski The implementation LGTM for now.

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks good overall. I left small comments on the test cases.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this seems a little brittle. Would it be enough to just check the exception type?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since it's just an accessor, maybe we could drop the parenthesis?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: usually we favor

stableConsumers.foreach { cons =>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we assert the expected exception?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need to add new code using the deprecated poll api?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have similar logic in PlaintextConsumerTest.subscribePollers. Not sure how easy it is to factor out the common logic, but it would be preferable if possible.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could call this receiveAndCommit or something so that the offset commit expectation is clear.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we might be able to simplify this. If we receive anything other than GroupMaxSizeReachedException, couldn't we fail directly? Then it could be a simple AtomicBoolean tracking whether or not we have received exactly one exception.

Comment thread core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala Outdated
…mer group sizes

This patch introduces a new config - "consumer.group.max.size", which caps the maximum size any consumer group can reach. It has a default value of Int.MAX_VALUE.
Once a consumer group is of the maximum size, subsequent JoinGroup requests receive a MAX_SIZE_REACHED error.

In the case where the config is changed and a Coordinator broker with the new config loads an old group that is over the threshold, members are kicked out of the group and a rebalance is forced.
The Coordinator now triggers a rebalance when it encounters an over-sized group. This gives a chance for the group members to commit offsets. Once the rebalance collects all the `JoinGroup` requests, the Coordinator shrinks the group, triggers another rebalance and responds to the JoinGroup requests to unblock consumers.
…roup is over capacity

Add test to ensure Errors.GROUP_MAX_SIZE_REACHED isn't retriable
Small refactorigs

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I will merge once the build completes (supposing no errors).

@hachikuji hachikuji changed the title KAFKA-7641: Introduce "consumer.group.max.size" config to limit consumer group sizes KAFKA-7641: Introduce "group.max.size" config to limit group sizes Feb 2, 2019
@hachikuji hachikuji merged commit 4420d9e into apache:trunk Feb 2, 2019
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* AK/trunk:
  fix typo (apache#5150)
  MINOR: Reduce replica.fetch.backoff.ms in ReassignPartitionsClusterTest (apache#5887)
  KAFKA-7766: Fail fast PR builds (apache#6059)
  KAFKA-7798: Expose embedded clientIds (apache#6107)
  KAFKA-7641; Introduce "group.max.size" config to limit group sizes (apache#6163)
  KAFKA-7433; Introduce broker options in TopicCommand to use AdminClient (KIP-377)
  MINOR: Fix some field definitions for ListOffsetReponse (apache#6214)
  KAFKA-7873; Always seek to beginning in KafkaBasedLog (apache#6203)
  KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (apache#6022)
  MINOR: fix checkstyle suppressions for generated RPC code to work on Windows
  KAFKA-7859: Use automatic RPC generation in LeaveGroups (apache#6188)
  KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (apache#6161)
  KAFKA-3522: Add RocksDBTimestampedStore (apache#6149)
  KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (apache#6204)
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…pache#6163)

This patch introduces a new config - "group.max.size", which caps the maximum size any group can reach. It has a default value of Int.MAX_VALUE. Once a group is of the maximum size, subsequent JoinGroup requests receive a MAX_SIZE_REACHED error.

In the case where the config is changed and a Coordinator broker with the new config loads an old group that is over the threshold, members are kicked out of the group and a rebalance is forced.

Reviewers: Vahid Hashemian <vahid.hashemian@gmail.com>, Boyang Chen <bchen11@outlook.com>, Gwen Shapira <cshapi@gmail.com>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants