Skip to content

GH-4329 Added new property group protocol in StreamsBuilderFactoryBean#4373

Merged
artembilan merged 10 commits into
spring-projects:mainfrom
jad837:GH-4329
Apr 7, 2026
Merged

GH-4329 Added new property group protocol in StreamsBuilderFactoryBean#4373
artembilan merged 10 commits into
spring-projects:mainfrom
jad837:GH-4329

Conversation

@jad837

@jad837 jad837 commented Mar 27, 2026

Copy link
Copy Markdown
Contributor

This PR addresses the issue #4329. It exposes group protocol configuration for streams directly through StreamsBuilderFactoryBean

This PR does the following

  • Expose configuration for Group Protocols in StreamsBuilderFactoryBean.

  • Adds test for StreamsBuilderFactoryBean to test the new configuration.

Questions

  • Do we want to keep raw property exposure for group protocol? This means that configuration inside factory bean can be null & if both are set then instance variable will override the raw properties configuration. We can add warning in such conditions.

  • Currently lowercase/uppercase values can be provided as "classic"/"streams", Do we want to add an assertion to check if values fall inside GroupProtocol enum? Currently values outside of enum will throw exception.

Closes #4329

@jad837

jad837 commented Mar 27, 2026

Copy link
Copy Markdown
Contributor Author

Hi @sobychacko can you please provide feedback on the PR?

Assert.notNull(streamsConfig, STREAMS_CONFIG_MUST_NOT_BE_NULL);
Assert.notNull(cleanupConfig, CLEANUP_CONFIG_MUST_NOT_BE_NULL);
this.properties = streamsConfig.asProperties();
this.applyGroupProtocol();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to applyGroupProtocol() in the constructor is always a no-op — this.groupProtocol is always null at construction time. See my review on that method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this. I have removed the method and added the property check in start() method.
I see that there is also a createInstance() method which returns a streamsBuilder, Do we need to add this in that method as well?

/**
* Retrieves and sets group protocol for properties of {@link StreamsBuilderFactoryBean}.
*/
public void applyGroupProtocol() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (this.groupProtocol != null) guard never passes since it is called from the ctor. The setter sets the field later, but there is no path from the setter back to applying it to this.properties. This means the feature silently has no effect. Also, applyGroupProtocol() should not be a public method — it is an internal implementation detail and should not be part of the API surface. However, I think that the right fix is to remove applyGroupProtocol() entirely and apply the value inline in start(), just before initializing KafkaStreams:

  if (this.groupProtocol != null) {
      this.properties.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
          this.groupProtocol.name().toLowerCase(Locale.ROOT));
  }
  this.kafkaStreams = this.kafkaStreamsCustomizer.initKafkaStreams(
      this.topology, this.properties, this.clientSupplier
  );

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this as per the recommendation.

streamsBuilderFactoryBean.start();
StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject();
verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties());
assertThat(streamsBuilderFactoryBean.getGroupProtocol()).isEqualTo(GroupProtocol.valueOf(testGroupProtocol.toUpperCase(Locale.ROOT)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test only verifies that getGroupProtocol() returns the correct enum value (the setter/getter round-trip), but never asserts that group.protocol actually ends up in the properties passed to KafkaStreams at start() time. A meaningful assertion would be:

  assertThat(streamsBuilderFactoryBean.getStreamsConfiguration())
      .containsEntry(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
          testGroupProtocol.toLowerCase(Locale.ROOT));

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test case is updated, Can you please check comment on same ? #4373 (comment)
Please suggest if we need to update implementation for createInstance/createStreamsBuilder.

* Only allowed values are from {@link GroupProtocol}
* @param groupProtocol groupProtocol value as given in {@link org.apache.kafka.clients.consumer.GroupProtocol}
*/
public void setGroupProtocol(String groupProtocol) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setter should accept GroupProtocol directly rather than a String:

public void setGroupProtocol(GroupProtocol groupProtocol) {
    Assert.notNull(groupProtocol, "'groupProtocol' must not be null");
    this.groupProtocol = groupProtocol;
}

A typo like fb.setGroupProtocol("strems") compiles fine today but blows up at runtime with an IllegalArgumentException. With the typed setter, fb.setGroupProtocol(GroupProtocol.STREMS)
simply does not compile. The valid options are also immediately discoverable via IDE auto-complete.
The typical usage would then be:

@Bean
public StreamsBuilderFactoryBeanConfigurer groupProtocolConfigurer() {
    return fb -> fb.setGroupProtocol(GroupProtocol.STREAMS);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated implementation to use GroupProtcol enum for Streams. For all the methods.

/**
* Set group protocol to be used by {@link StreamsBuilderFactoryBean}.
* Only allowed values are from {@link GroupProtocol}
* @param groupProtocol groupProtocol value as given in {@link org.apache.kafka.clients.consumer.GroupProtocol}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong package for GroupProtocol.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the comment.

Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither.
Starting with version 2.7, the default is to never clean up local state.

=== Group Protocol Configuration

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to consider adding an extra anchor - something like [[streams-group-protocol] and then use it to link from whats-new entry.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated whats-new entry with an anchor to stream doc

@sobychacko

Copy link
Copy Markdown
Contributor

@jad837 Thanks for the PR. I added some comments, please take a look. Add your name as an @author to all the classes you changed. Also, follow these commit guidelines: https://cbea.ms/git-commit/

@jad837 jad837 marked this pull request as ready for review March 30, 2026 15:58
* Get groupProtocol defined for this {@link StreamsBuilderFactoryBean}.
* @return groupProtocol returns {@link GroupProtocol} value defined for this {@link StreamsBuilderFactoryBean}
*/
public @Nullable GroupProtocol getGroupProtocol() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is real value to get access to this property.
Please, consider to remove the getter, unless you have a strong argument why do we need it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed getter.

Assert.state(this.properties != null,
"streams configuration properties must not be null");
if (this.getGroupProtocol() != null) {
this.properties.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to emit the WARN log message in the case when property is provided already in the properties and we are going to override it with this our groupProtocol.
And probably only when they are not equal 😄

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a warning, can you please check if wording is correct?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.
First of all that is too many this.properties.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG) call.
Consider to extract it into a local variable.

I think the wording in the warning message should be like:

The 'group.protocol=%s' property is overridden with '%s'."

Nothing more.
Also, consider to use LogMessage.format() instead of string.
That would be unfortunate to calculate string for nothing when warn is not enabled.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too many this.properties.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG)

was not addressed.

"streams configuration properties must not be null");
if (this.getGroupProtocol() != null) {
this.properties.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
this.getGroupProtocol().name().toLowerCase(Locale.ROOT));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need in the Locale since we turn pre-compiled enum value to string, not opposite to cause some vulnerability.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the Locale usage.

.containsEntry(ConsumerConfig.GROUP_PROTOCOL_CONFIG, testGroupProtocol.name().toLowerCase(Locale.ROOT));
// Need to remove group protocol config to ensure other tests not get affected
// due to pattern usage with streams group protocol
kafkaStreamsConfiguration.asProperties().remove(ConsumerConfig.GROUP_PROTOCOL_CONFIG);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is a bug.
We must not mutate shared instance.
And looks like that is exactly what happens here: we provide shared kafkaStreamsConfiguration and then we mutate its properties adding a new one in the StreamsBuilderFactoryBean.
I think KafkaStreamsConfiguration must return a fresh copy of its internal state.
We also have to make a copy in the setStreamsConfiguration(Properties streamsConfig).
And must return a copy from the getStreamsConfiguration().
All of that to avoid bug on mutation of shared objects.

However I believe that has to be done in the separate PR and back-ported.
And then when you rebase this one, all should be good in this test.

Although I still have concerns for the purpose of this test.
I believe we should aim for asserting whatever ConsumerConfig.GROUP_PROTOCOL_CONFIG is in the end for the StreamsBuilder, not streamsBuilderFactoryBean.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we are adding group.protocol to KafkaStreams and not to StreamsBuilder. Might have to add group protocol property through createInstance/createStreamsBuilder in case we need to add it for streamsbuilder. I am not sure though if streamsbuilder can use group.protocol as it seems to mainly handle topology configurations?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just must not modify shared instance.
That's the point.
An internal properties in the StreamsBuilderFactoryBean has to be isolated and modified before it is propagated down to the Kafka Streams API.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamsBuilderFactoryBean creates kafkaStreams in start() so Ideally the test case is just ensuring that the passed down properties when creating KafkaStreams are correct.
As we are not able to directly check KafkaStreams configuration properties, better option would be to drop the test case assuming that KafkaStreams will use properties correctly.
Please share if you have an idea on how can we handle this better.

As for mutable shared instance of properties I will create a separate PR/issue it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, what I see so far:

  1. StreamsBuilderFactoryBean manages a KafkaStreams object after its start.
  2. The KafkaStreams is based on the provided properties (including our custom ConsumerConfig.GROUP_PROTOCOL_CONFIG)
  3. Those props are stored in the KafkaStreams.applicationConfigs .
  4. So, in our test we should check that provided ConsumerConfig.GROUP_PROTOCOL_CONFIG is really propagated down there:
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
StreamsConfig streamsConfig = KafkaTestUtils.getPropertyValue(kafkaStreams, "applicationConfigs", StreamsConfig.class);
streamsConfig.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG);

What do I miss?

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, please, stop commenting on everything what you have fixed.
We will see the change in the next commit.

Feel free to comment if you are not agreed with out suggestion.
Saying just for sake saving your and our time around those extra comments 😄

Assert.state(this.properties != null,
"streams configuration properties must not be null");
if (this.getGroupProtocol() != null) {
this.properties.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.
First of all that is too many this.properties.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG) call.
Consider to extract it into a local variable.

I think the wording in the warning message should be like:

The 'group.protocol=%s' property is overridden with '%s'."

Nothing more.
Also, consider to use LogMessage.format() instead of string.
That would be unfortunate to calculate string for nothing when warn is not enabled.

@jad837 jad837 requested review from artembilan and sobychacko April 1, 2026 15:03

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we are very close to what we want.
However I will hold this PR off from merge until we have a mutability fix around those Properties objects.

Assert.state(this.properties != null,
"streams configuration properties must not be null");
if (this.getGroupProtocol() != null) {
this.properties.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too many this.properties.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG)

was not addressed.

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jad837 ,

Now that #4382 is merged, please, consider to rebase this branch to latest main and fix your test respectively, since properties are not mutated anymore.

Thanks

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

17 commits

I think I was clear when I asked to rebase your branch to the latest upstream main: https://stackoverflow.com/questions/804115/when-do-you-use-git-rebase-instead-of-git-merge.

Now this is very hard to review.

@jad837

jad837 commented Apr 7, 2026

Copy link
Copy Markdown
Contributor Author

17 commits

I think I was clear when I asked to rebase your branch to the latest upstream main: https://stackoverflow.com/questions/804115/when-do-you-use-git-rebase-instead-of-git-merge.

Now this is very hard to review.

Hi give me sometime I will rebase and push.

I am trying to use this property via test project once done I will rebase and ask for review.
Sorry for Inconvenience, some tests are flaky when running locally so I have to push to ensure things are working correctly.

jad837 added 8 commits April 7, 2026 14:25
…msBuilderFactoryBean

Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
…ties.

Changed group protocol signature to GroupProtocol enum to ensure that correct group protocol is set for beans.

Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
* Replace ConsumerConfig with StreamsConfig
* Update StreamsBuilderFactoryBean logic
* Fix broken group protocol test cases

Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Comment thread spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc
* Set group protocol to be used by {@link StreamsBuilderFactoryBean}.
* Only allowed values are from {@link GroupProtocol}
* @param groupProtocol groupProtocol value as given in {@link GroupProtocol}
* @since 4.1.1

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not why have you decided to use this version if we asked explicitly for 4.1

streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) {
@Override
protected StreamsBuilder createInstance() {
return spy(super.createInstance());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you spy if you don't verify eventually?

streamsBuilderFactoryBean.setGroupProtocol(testGroupProtocol);
streamsBuilderFactoryBean.afterPropertiesSet();
StreamsBuilder builder = streamsBuilderFactoryBean.getObject();
builder.stream("foo");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, no unprofessional foo/bar language in new code.
I know there is a number of them around to be fixed one day, but that doesn't mean we should continue following such a bad habit.

jad837 added 2 commits April 7, 2026 15:22
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
@jad837 jad837 requested a review from artembilan April 7, 2026 15:43
@artembilan artembilan merged commit 54c9200 into spring-projects:main Apr 7, 2026
3 checks passed
@artembilan

Copy link
Copy Markdown
Member

@jad837 ,

thank you for contribution; looking forward for more!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Expose group.protocol configuration for Kafka Streams server-side rebalance (KIP-1071)

3 participants