GH-3145 : Add test for next generation consumer-group rebalance protocol#3237
GH-3145 : Add test for next generation consumer-group rebalance protocol#3237sobychacko merged 16 commits intospring-projects:mainfrom
Conversation
|
Thanks, @chickenchickenlove, for this PR. We will take a look at this today. |
|
@chickenchickenlove Sorry about the delay in reviewing. I took a look. Here are some general comments.
I was able to follow your sample, but could you please clarify the connection of Thanks! |
|
@sobychacko thanks for your comments 🙇♂️
I make new commit to apply your review.
Sorry to make you confused.
At first, i made new test codes by referring to other test codes that use When you have free time, Please take another look 🙇♂️ |
samples/sample-07/build.gradle
Outdated
| testImplementation 'org.springframework.boot:spring-boot-starter-test' | ||
| testImplementation 'org.springframework.kafka:spring-kafka-test' | ||
|
|
||
| testImplementation 'org.testcontainers:kafka:1.19.7' |
There was a problem hiding this comment.
I think Testcontainers are managed dependecy by Spring Boot: https://github.com/spring-projects/spring-boot/blob/main/spring-boot-project/spring-boot-dependencies/build.gradle#L2119.
So, we don't need the version here.
samples/sample-07/build.gradle
Outdated
| testImplementation 'org.springframework.kafka:spring-kafka-test' | ||
|
|
||
| testImplementation 'org.testcontainers:kafka:1.19.7' | ||
| implementation 'org.apache.kafka:kafka-clients:3.7.0' |
There was a problem hiding this comment.
Why do we need this explicitly since we should rely on the transitive dependency from the org.springframework.kafka:spring-kafka?
Well, I guess we have to make this sample fully based on Spring Boot 3.3.
Even if it is going to be on SNAPSHOT until GA release in a couple week, it is OK to have it in this sample.
There was a problem hiding this comment.
@artembilan thanks for your comments.
I use the springframework.boot:3.3.0-SNAPSHOT.
does it make sense to you? I worry about misunderstanding of your opinion.
samples/sample-07/build.gradle
Outdated
| plugins { | ||
| id 'java' | ||
| id 'org.springframework.boot' version '3.2.5' | ||
| id 'io.spring.dependency-management' version '1.1.4' |
samples/sample-07/build.gradle
Outdated
| } | ||
|
|
||
| group = 'com.example' | ||
| version = '0.0.1-SNAPSHOT' |
There was a problem hiding this comment.
I believe the version has to be exactly the same what is Spring Kafka project, so 3.2.0-SNAPSHOT
samples/sample-07/src/main/java/com/example/sample07/Sample07Application.java
Show resolved
Hide resolved
| String portFormat = String.format("%d:%d/%s", 12000, 9094, InternetProtocol.TCP.toDockerNotation()); | ||
| final List<String> portsBinding = Collections.singletonList(portFormat); | ||
| broker.setPortBindings(portsBinding); | ||
| broker.start(); |
There was a problem hiding this comment.
Please, look into Spring Boot Testcontainers Support: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.testcontainers.
I would expect that community would like to see some convenient high-level API.
There was a problem hiding this comment.
@artembilan , sorry to say that it is impossible to use high-level API like @ServiceConnection currently.
apache/kafkasupportserver side consumer rebalancing protocolfromapache/kafka:3.7.0testContainerssupportcp/conflunetic-kafkaonly.- There is no
cp/confluent-kafkaversion compatible withapache/kafka:3.7.0. (See more, please click link) - The low-level code in
KafkaContaineris coupled with a lot of Confluent stuff, including the name of the vars (Kafka Testcontainer should use bitnami/kafka image, or at least allow it as compatible testcontainers/testcontainers-java#8107 (comment))
Considering the above four points, I am unable to use KafkaContainer.
Thus i should use GenericContainer instead of KafkaContainer.
The @ServiceConnection you recommended, depends on KafkaContainer.
As you can see this,
There is no ConnectionDetails instance for GenericContainer.

This is why i cannot use high-level API of testContainers like @ServiceConnection.
I used @TestContainers to the best of your advice. If you have a better way, please let me know. 😄
There was a problem hiding this comment.
There was a problem hiding this comment.
Could you clarify? In the above comments, you say that - testContainers support cp/conflunetic-kafka only. and then In the docs of test-containers:kafka, they support apache/kafka with org.testcontainers.kafka.KafkaContainer. . Which one do they support? Incidentally, We recently switched to apache/kafka from the old wurstmeister/kafka for some AOT testing projects we have.
There was a problem hiding this comment.
Docs says
org.testcontainers.containers.KafkaContainersupportsconfluentinc/cp-kafka.org.testcontainers.kafka.KafkaContainersupportsapache/kafka.
However, there is no org.testcontainers.kafka.KafkaContainer. (Please, see image below, testContainers.kafka:1.19.7)

So I looked it up. I was able to find this issue on testcontainers' github.
It's kind of well-known bug. i think it means some testContainers version cannot use org.testcontainers.kafka.KafkaContainer. (testcontainers/testcontainers-java#8576 (comment))
By the way, it seems to be fixed (1.19.8, deployed may 9).

When I wrote this PR, the latest version was 1.19.7. sorry to make you confused. 🙇♂️
There was a problem hiding this comment.
Thanks for the clarification!
|
|
||
| GenericContainer setUpBroker() { | ||
| final DockerImageName imageName = DockerImageName.parse(KAFKA_IMAGE_NAME); | ||
| final GenericContainer broker = new GenericContainer(imageName); |
There was a problem hiding this comment.
Why do we use org.testcontainers:kafka then if we don't rely on its specific API?
| // https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol | ||
| // https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes | ||
| broker.addEnv("KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS", "classic,consumer"); | ||
| broker.addEnv("KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE", "false"); |
There was a problem hiding this comment.
Cannot all of these env vars be extracted into a properties files?
And then we can use broker.getEnvMap().putAll()
| import org.springframework.boot.test.context.SpringBootTest; | ||
|
|
||
| @SpringBootTest | ||
| class Sample07ApplicationTests { |
There was a problem hiding this comment.
Why do we still need this class?
Or why don't do the logic in your new class exactly in this one?
So, we won't need to fight for naming 😄
| } | ||
|
|
||
| @Bean | ||
| public ThreadPoolTaskExecutor threadPoolTaskExecutor() { |
There was a problem hiding this comment.
We don't need beans from here and above since they have to be auto-configured by Spring Boot.
When you switch to the @ServiceConnection on the container bean definition, everything will be picked up by auto-configuration.
I'm not sure in your statement. |
|
I think what he means is that the new consumer (non-classic) does not support explicitly triggering |
|
Ah! I see now in the |
artembilan
left a comment
There was a problem hiding this comment.
Some README.adoc required for this new sample to explain what is going on and why.
samples/sample-07/build.gradle
Outdated
|
|
||
| tasks.withType(Checkstyle) { | ||
| checkstyle { | ||
| configDirectory.set(rootProject.file("src/checkstyle")) |
There was a problem hiding this comment.
I don't think we need any Checkstyle validation in samples.
The user may take it as it is and reformat for their own needs.
| } | ||
|
|
||
| @Bean | ||
| KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> producerFactory) { |
There was a problem hiding this comment.
We don't need this bean. The auto-configuration does exactly the same.
|
|
||
| @Bean | ||
| ProducerFactory<Integer, String> producerFactory() { | ||
| return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(BROKER)); |
There was a problem hiding this comment.
I think that BROKER can be set from application.properties and auto-configuration will take care for us to build this ProducerFactory bean.
| Consumer<Object, Object> dummyConsumer = new KafkaConsumer<>(propsDummy); | ||
|
|
||
| Thread.sleep(5000); | ||
| dummyConsumer.close(); |
There was a problem hiding this comment.
This looks like a mix of concerns.
Why cannot we just rely on the auto-configuration for the ConsumerFactory and then have such a dummy consumer created from the ApplicationRunner?
There was a problem hiding this comment.
I guess that new consumer protocol has tiny concurrent issue. (I reported it via https://issues.apache.org/jira/browse/KAFKA-16670)
In case of 3, my work around is using dummy consumer to make broker load metadata from __consumer_offsets.
I don't think so. it is intended. 😄
As i said in PR description, this is work around to solve tiny issue.
- Right after the
brokerstarts,brokerseems not load metadata from__consumer_offsets. Once the metadata is loaded, thebrokercan function as a normal coordinator. - When a
consumeris created,consumersends afindCoordinatorrequest to thebootstrap server. - Upon receiving
findCoordinatorrequest, thebrokerschedules a metadata load from__consumer_offsets. However, at this point, since thebrokerhas not loaded the metadata yet.
and then, consumer.subscribe() and consumer.poll() will be executed.
If broker does not load metadata yet, consumer received response of first consumer.poll() and Response said this coordinator is invalid.
And then, consumer try to find valid coordinator, but stuck in that status forever.
This is problem which what i said.
To solve this problem, I create dummyConsumer to make broker schedule loading metadata. (and dummyConsumer will be closed after sleeping 5 sec, because it's only needed to trigger schedule).
This is why i make other consumer without ConsumerFactory! 🤔
What do you think?
There was a problem hiding this comment.
And then, consumertry to find validcoordinator`, but stuck in that status forever.
If that is the problem of Apache Kafka by itself for this new algorithm, how about to postpone such a sample until it is solved?
I wonder how end-user are supposed to solve this problem in the applications?
If you still insist having this sample, how about to rework it the way end-user would implement it in their own applications?
No one is going to do test stuff in production.
The sample purpose is to get it is and have your own application up-and-running.
There was a problem hiding this comment.
Um... I see!
I will try to find common way.
If i fail to find a good common way, it might be better to close this PR. 🤔
| } | ||
|
|
||
| @Bean | ||
| public ThreadPoolTaskExecutor threadPoolTaskExecutor() { |
There was a problem hiding this comment.
We don't need this bean. Spring Boot auto-configures for us one: TaskExecutionAutoConfiguration.
If we need some custom its properties, they can be set from the application.properties: spring.task.execution prefix.
| } | ||
|
|
||
| @Bean | ||
| public GenericContainer<?> brokerContainer() { |
There was a problem hiding this comment.
I don't think this suppose to be a bean.
See @Container and read its Javadocs how Testcontainers manage their lifecycle.
There was a problem hiding this comment.
I tried to do @Container
However, there is a major reason why i cannot use it.
- As i commented on this,
brokershould load metadata beforeconsumer.subscribe()is executed. - If using
@Container,brokercontainer will be started successfully. however, there is no hooking point to executedummyConsumerto makebrokerschedule load tometadatabeforeconsumerwhich are managed byspring-kafkaexecutesubscribe().
We should make broker schedule to load metadata before consumer which is managed by spring-kafka are registered as spring bean. However, AFAIK, there is no annotation to define the execution order at the time of registering Spring beans on @Configuration.
I have this difficulty. Do you have good idea? 🙇♂️
There was a problem hiding this comment.
Yeah... That's too much involved.
I would give up already making this working 😄 .
As I said in other my comment, it might be a time to postpone such a sample until Apache Kafka has that coordinator fixed. 😢
@sobychacko , WDYT?
There was a problem hiding this comment.
As you said, it seems to be better to postpone 😂😂.
Let me check that there is common way as last.
Because i'm not professional to kafka and still learning about it.
Thus some mis-understanding can be existed. so, i will take another look to check mis-understanding made by me as last 😭
There was a problem hiding this comment.
Hey, @artembilan
I found root cause of kafka, so tried to create a new PR on apache/kafka.
However it seems to have already been fixed by this PR (apache/kafka#15698)
The problem was caused by not retrying the error response (as opposed to the log say).
However, that PR is not included in apache/kafka:3.7.0. 😓
Thus, we should postpone this PR.
May I leave this PR in an open status and postpone it?
Thanks in advance!
There was a problem hiding this comment.
Yes. We can keep it opened until the next Apache Kafka 2.7.1.
Thank you!
| props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER); | ||
| props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); | ||
| props.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); | ||
| props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
There was a problem hiding this comment.
These all could go to the application.properties
There was a problem hiding this comment.
This is not for consumer managed by spring-kafka.
I make two consumers which not are managed by spring-kafka.
One is rawConsumer for triggering consumer rebalance.
The other one is dummyConsumer for making broker scheduling to load metadata. (#3237 (comment))
Thus, we don't need to application.properties.
I think application.properties is for the consumer managed by spring-kafka, right? (for example, spring.kafka.admin.client-id)
If you want to migrate this to properties, how about making dummy-consumer.properties and use it?
static Map<String, String> getPropertiesFromFile() {
final Resource resource = new ClassPathResource(BROKER_PROPERTIES_FILE_PATH);
try {
final Properties properties = PropertiesLoaderUtils.loadAllProperties(resource.getFilename());
return properties.stringPropertyNames().stream()
.collect(Collectors.toMap(
s -> s,
s -> (String) properties.get(s)));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}What do you think? Please let me know your opinion 🙇♂️
There was a problem hiding this comment.
Ah, i missed one.
I will move the settings for consumer which are managed by spring-kafka to application.properties. However, it seems necessary to leave the settings for dummyConsumer and rawConsumer as they are.
What do you think?
|
|
||
| factory.setConsumerFactory(consumerFactory); | ||
|
|
||
| factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() { |
There was a problem hiding this comment.
The ConsumerAwareRebalanceListener could be just a bean and Spring Boot takes care about it to be injected into the auto-configured kafkaListenerContainerFactory.
| public class Sample07Application { | ||
|
|
||
| public static void main(String[] args) { | ||
| SpringApplication.run(Sample07Application.class, args); |
There was a problem hiding this comment.
Looking into this from high level, I wonder what we are trying to achieve with this sample.
Where are real consumers (@KafkaListener method) for end-user purpose.
Having the test is OK, but it does not give end-users too much value since this sample really does not demonstrate (yet) how really develop an application with that new consumer group management algorithm.
There was a problem hiding this comment.
The PR in the test configuration currently has a KafkaListener where the underlying container factory uses a consumer factory where the new consumer group protocol is used. Since this feature is in its early access form (and not recommended for production usage yet), it might still be beneficial for any users who want to play around with this in a test format. In other words, we don't expect any users to use it in a real application yet until Apache Kafka makes that recommendation, but users might be interested in trying this out via Spring Kafka, and this sample could help with that — just my 2 cents.
|
Looks like there is a feature in Spring Boot called Let's see if that might work better for this our sample (instead of Testcontainers) since we indeed must not talk about fully concentrate on the test, but rather demonstration how to use new consumer group management. |
|
@artembilan My Understanding
Direction
If i misunderstood about your intention, please let me know and give more detail when you have free time 🙇♂️ |
Sounds like a plan. We need to have our consumer in a new consumer group algorithm and that's it. |
|
Hi, @artembilan, @sobychacko ! I created new commit to apply your comments.
I realized that there is new issue for When you have some free time, please take another look 🙇♂️. Thanks in advance! 🙇♂️ |
|
@chickenchickenlove Good point, created this issue: #3254. This is a groundwork for possibly including this property in Boot auto-config eventually. |
|
The sample clean up looks good. I think it might be beneficial to have a |
sobychacko
left a comment
There was a problem hiding this comment.
See my comments about the README.
samples/sample-07/README.adoc
Outdated
| @@ -0,0 +1,50 @@ | |||
| == Sample 7 | |||
|
|
|||
| This sample demonstrated the application of the new consumer rebalance protocol in Spring Kafka. | |||
There was a problem hiding this comment.
Spring for Apache Kafka.
samples/sample-07/README.adoc
Outdated
|
|
||
| This sample demonstrated the application of the new consumer rebalance protocol in Spring Kafka. | ||
|
|
||
| The new consumer rebalance protocol refers to the Server Side rebalance protocol proposed in KIP-848. |
sobychacko
left a comment
There was a problem hiding this comment.
See the inline comments in README.
sobychacko
left a comment
There was a problem hiding this comment.
@chickenchickenlove We can use the property like below even today:
spring.kafka.consumer.properties[group.protocol]=CONSUMER.
|
@sobychacko thanks for your quick comments 👍 Two points seem to work well as you recommended. (Even if new consumer rebalancing protocol)
When you have free time, please take another look! 🙇♂️ |
|
Sounds good. We will take a final look through the sample and merge it. |
| * @since 3.3 | ||
| */ | ||
|
|
||
| @EnableKafka |
There was a problem hiding this comment.
We don't need this annotation since we rely on Spring Boot.
| kafka: | ||
| consumer: | ||
| bootstrap-servers: localhost:10000 | ||
| group-id: sample07 |
There was a problem hiding this comment.
I see you use some other groups in your @KafkaListener methods.
Why do we still need it here?
|
|
||
| testImplementation 'org.springframework.boot:spring-boot-starter-test' | ||
| testImplementation 'org.springframework.kafka:spring-kafka-test' | ||
| testImplementation 'org.springframework.boot:spring-boot-testcontainers' |
There was a problem hiding this comment.
Why do we need all of these test dependencies if we don't have any tests in this project?
And that raises the question: why we don't have any tests in this project?
|
I see PR has been merged. |
|
@chickenchickenlove Thanks for the PR with this sample. We went back and forth a lot, but thanks for being patient during the review process. We are glad that this sample is in. @artembilan Sorry, I noticed your latest review only after merging. I will follow up with the cleanup. |
|
Hi, @artembilan ! @sobychacko |


Motivation:
new Consumer Rebalancing ProtocolusingEmbeddedKafka. because we cannot setcontroller.quorum.voters. this is becausecontroller.quorum.votersare hard-coded as0.0.0.0:0andnew consumer rebalancing protocolsupportKRaftonly. so, we cannot testnew consumer Rebalancing protocolby usingEmbeddedKafka.TestContainersto test it. however, it is not appropriate to add a new dependency tospring-kafkajust for that test. thus, i would like to add only a sample code for writing a Test Case usingTestContainer.Modifications
sample-07to/samples/to testnew consumer rebalancing protocolby usingtestContainers.To Reviewer
new consumer protocoldoes not supportconsumer.enforceRebalance().new consumer protocoldoes not supportZookeeper mode. (It's not explicitly mentioned. but you can check it from this issue : https://issues.apache.org/jira/browse/KAFKA-16657)new consumer protocolhas tiny concurrent issue. (I reported it via https://issues.apache.org/jira/browse/KAFKA-16670)In case of 3, my work around is using dummy
consumerto make broker load metadata from__consumer_offsets.Test Scenario
spring-kafka consumersubscribehello-topic. (1st rebalancing occurs)rawKafkaConsumersubscribehello-topic. (2nd rebalancing occurs. this is becausenew consumer rebalancing protocoldoes not supportconsumer.enforceRebalance())rawKafkaConsumeris closed. (3rd rebalancing occurs).Result:
Releated