GH-4329 Added new property group protocol in StreamsBuilderFactoryBean#4373
Conversation
|
Hi @sobychacko can you please provide feedback on the PR? |
| Assert.notNull(streamsConfig, STREAMS_CONFIG_MUST_NOT_BE_NULL); | ||
| Assert.notNull(cleanupConfig, CLEANUP_CONFIG_MUST_NOT_BE_NULL); | ||
| this.properties = streamsConfig.asProperties(); | ||
| this.applyGroupProtocol(); |
There was a problem hiding this comment.
The call to applyGroupProtocol() in the constructor is always a no-op — this.groupProtocol is always null at construction time. See my review on that method.
There was a problem hiding this comment.
Thanks for raising this. I have removed the method and added the property check in start() method.
I see that there is also a createInstance() method which returns a streamsBuilder, Do we need to add this in that method as well?
| /** | ||
| * Retrieves and sets group protocol for properties of {@link StreamsBuilderFactoryBean}. | ||
| */ | ||
| public void applyGroupProtocol() { |
There was a problem hiding this comment.
if (this.groupProtocol != null) guard never passes since it is called from the ctor. The setter sets the field later, but there is no path from the setter back to applying it to this.properties. This means the feature silently has no effect. Also, applyGroupProtocol() should not be a public method — it is an internal implementation detail and should not be part of the API surface. However, I think that the right fix is to remove applyGroupProtocol() entirely and apply the value inline in start(), just before initializing KafkaStreams:
if (this.groupProtocol != null) {
this.properties.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
this.groupProtocol.name().toLowerCase(Locale.ROOT));
}
this.kafkaStreams = this.kafkaStreamsCustomizer.initKafkaStreams(
this.topology, this.properties, this.clientSupplier
);
There was a problem hiding this comment.
Updated this as per the recommendation.
| streamsBuilderFactoryBean.start(); | ||
| StreamsBuilder streamsBuilder = streamsBuilderFactoryBean.getObject(); | ||
| verify(streamsBuilder).build(kafkaStreamsConfiguration.asProperties()); | ||
| assertThat(streamsBuilderFactoryBean.getGroupProtocol()).isEqualTo(GroupProtocol.valueOf(testGroupProtocol.toUpperCase(Locale.ROOT))); |
There was a problem hiding this comment.
The test only verifies that getGroupProtocol() returns the correct enum value (the setter/getter round-trip), but never asserts that group.protocol actually ends up in the properties passed to KafkaStreams at start() time. A meaningful assertion would be:
assertThat(streamsBuilderFactoryBean.getStreamsConfiguration())
.containsEntry(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
testGroupProtocol.toLowerCase(Locale.ROOT));
There was a problem hiding this comment.
Test case is updated, Can you please check comment on same ? #4373 (comment)
Please suggest if we need to update implementation for createInstance/createStreamsBuilder.
| * Only allowed values are from {@link GroupProtocol} | ||
| * @param groupProtocol groupProtocol value as given in {@link org.apache.kafka.clients.consumer.GroupProtocol} | ||
| */ | ||
| public void setGroupProtocol(String groupProtocol) { |
There was a problem hiding this comment.
The setter should accept GroupProtocol directly rather than a String:
public void setGroupProtocol(GroupProtocol groupProtocol) {
Assert.notNull(groupProtocol, "'groupProtocol' must not be null");
this.groupProtocol = groupProtocol;
}
A typo like fb.setGroupProtocol("strems") compiles fine today but blows up at runtime with an IllegalArgumentException. With the typed setter, fb.setGroupProtocol(GroupProtocol.STREMS)
simply does not compile. The valid options are also immediately discoverable via IDE auto-complete.
The typical usage would then be:
@Bean
public StreamsBuilderFactoryBeanConfigurer groupProtocolConfigurer() {
return fb -> fb.setGroupProtocol(GroupProtocol.STREAMS);
}
There was a problem hiding this comment.
I have updated implementation to use GroupProtcol enum for Streams. For all the methods.
| /** | ||
| * Set group protocol to be used by {@link StreamsBuilderFactoryBean}. | ||
| * Only allowed values are from {@link GroupProtocol} | ||
| * @param groupProtocol groupProtocol value as given in {@link org.apache.kafka.clients.consumer.GroupProtocol} |
There was a problem hiding this comment.
wrong package for GroupProtocol.
There was a problem hiding this comment.
I have updated the comment.
| Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither. | ||
| Starting with version 2.7, the default is to never clean up local state. | ||
|
|
||
| === Group Protocol Configuration |
There was a problem hiding this comment.
Might want to consider adding an extra anchor - something like [[streams-group-protocol] and then use it to link from whats-new entry.
There was a problem hiding this comment.
Updated whats-new entry with an anchor to stream doc
|
@jad837 Thanks for the PR. I added some comments, please take a look. Add your name as an |
| * Get groupProtocol defined for this {@link StreamsBuilderFactoryBean}. | ||
| * @return groupProtocol returns {@link GroupProtocol} value defined for this {@link StreamsBuilderFactoryBean} | ||
| */ | ||
| public @Nullable GroupProtocol getGroupProtocol() { |
There was a problem hiding this comment.
There is real value to get access to this property.
Please, consider to remove the getter, unless you have a strong argument why do we need it.
| Assert.state(this.properties != null, | ||
| "streams configuration properties must not be null"); | ||
| if (this.getGroupProtocol() != null) { | ||
| this.properties.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, |
There was a problem hiding this comment.
I think we need to emit the WARN log message in the case when property is provided already in the properties and we are going to override it with this our groupProtocol.
And probably only when they are not equal 😄
There was a problem hiding this comment.
I have added a warning, can you please check if wording is correct?
There was a problem hiding this comment.
Thanks.
First of all that is too many this.properties.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG) call.
Consider to extract it into a local variable.
I think the wording in the warning message should be like:
The 'group.protocol=%s' property is overridden with '%s'."
Nothing more.
Also, consider to use LogMessage.format() instead of string.
That would be unfortunate to calculate string for nothing when warn is not enabled.
There was a problem hiding this comment.
too many
this.properties.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG)
was not addressed.
| "streams configuration properties must not be null"); | ||
| if (this.getGroupProtocol() != null) { | ||
| this.properties.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, | ||
| this.getGroupProtocol().name().toLowerCase(Locale.ROOT)); |
There was a problem hiding this comment.
There is no need in the Locale since we turn pre-compiled enum value to string, not opposite to cause some vulnerability.
There was a problem hiding this comment.
Removed the Locale usage.
| .containsEntry(ConsumerConfig.GROUP_PROTOCOL_CONFIG, testGroupProtocol.name().toLowerCase(Locale.ROOT)); | ||
| // Need to remove group protocol config to ensure other tests not get affected | ||
| // due to pattern usage with streams group protocol | ||
| kafkaStreamsConfiguration.asProperties().remove(ConsumerConfig.GROUP_PROTOCOL_CONFIG); |
There was a problem hiding this comment.
Well, this is a bug.
We must not mutate shared instance.
And looks like that is exactly what happens here: we provide shared kafkaStreamsConfiguration and then we mutate its properties adding a new one in the StreamsBuilderFactoryBean.
I think KafkaStreamsConfiguration must return a fresh copy of its internal state.
We also have to make a copy in the setStreamsConfiguration(Properties streamsConfig).
And must return a copy from the getStreamsConfiguration().
All of that to avoid bug on mutation of shared objects.
However I believe that has to be done in the separate PR and back-ported.
And then when you rebase this one, all should be good in this test.
Although I still have concerns for the purpose of this test.
I believe we should aim for asserting whatever ConsumerConfig.GROUP_PROTOCOL_CONFIG is in the end for the StreamsBuilder, not streamsBuilderFactoryBean.
There was a problem hiding this comment.
Right now we are adding group.protocol to KafkaStreams and not to StreamsBuilder. Might have to add group protocol property through createInstance/createStreamsBuilder in case we need to add it for streamsbuilder. I am not sure though if streamsbuilder can use group.protocol as it seems to mainly handle topology configurations?
There was a problem hiding this comment.
We just must not modify shared instance.
That's the point.
An internal properties in the StreamsBuilderFactoryBean has to be isolated and modified before it is propagated down to the Kafka Streams API.
There was a problem hiding this comment.
StreamsBuilderFactoryBean creates kafkaStreams in start() so Ideally the test case is just ensuring that the passed down properties when creating KafkaStreams are correct.
As we are not able to directly check KafkaStreams configuration properties, better option would be to drop the test case assuming that KafkaStreams will use properties correctly.
Please share if you have an idea on how can we handle this better.
As for mutable shared instance of properties I will create a separate PR/issue it.
There was a problem hiding this comment.
Well, what I see so far:
StreamsBuilderFactoryBeanmanages aKafkaStreamsobject after its start.- The
KafkaStreamsis based on the provided properties (including our customConsumerConfig.GROUP_PROTOCOL_CONFIG) - Those props are stored in the
KafkaStreams.applicationConfigs. - So, in our test we should check that provided
ConsumerConfig.GROUP_PROTOCOL_CONFIGis really propagated down there:
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
StreamsConfig streamsConfig = KafkaTestUtils.getPropertyValue(kafkaStreams, "applicationConfigs", StreamsConfig.class);
streamsConfig.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG);
What do I miss?
artembilan
left a comment
There was a problem hiding this comment.
And, please, stop commenting on everything what you have fixed.
We will see the change in the next commit.
Feel free to comment if you are not agreed with out suggestion.
Saying just for sake saving your and our time around those extra comments 😄
| Assert.state(this.properties != null, | ||
| "streams configuration properties must not be null"); | ||
| if (this.getGroupProtocol() != null) { | ||
| this.properties.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, |
There was a problem hiding this comment.
Thanks.
First of all that is too many this.properties.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG) call.
Consider to extract it into a local variable.
I think the wording in the warning message should be like:
The 'group.protocol=%s' property is overridden with '%s'."
Nothing more.
Also, consider to use LogMessage.format() instead of string.
That would be unfortunate to calculate string for nothing when warn is not enabled.
artembilan
left a comment
There was a problem hiding this comment.
Now we are very close to what we want.
However I will hold this PR off from merge until we have a mutability fix around those Properties objects.
| Assert.state(this.properties != null, | ||
| "streams configuration properties must not be null"); | ||
| if (this.getGroupProtocol() != null) { | ||
| this.properties.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, |
There was a problem hiding this comment.
too many
this.properties.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG)
was not addressed.
artembilan
left a comment
There was a problem hiding this comment.
17 commits
I think I was clear when I asked to rebase your branch to the latest upstream main: https://stackoverflow.com/questions/804115/when-do-you-use-git-rebase-instead-of-git-merge.
Now this is very hard to review.
Hi give me sometime I will rebase and push. I am trying to use this property via test project once done I will rebase and ask for review. |
…msBuilderFactoryBean Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
…ties. Changed group protocol signature to GroupProtocol enum to ensure that correct group protocol is set for beans. Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
* Replace ConsumerConfig with StreamsConfig * Update StreamsBuilderFactoryBean logic * Fix broken group protocol test cases Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
| * Set group protocol to be used by {@link StreamsBuilderFactoryBean}. | ||
| * Only allowed values are from {@link GroupProtocol} | ||
| * @param groupProtocol groupProtocol value as given in {@link GroupProtocol} | ||
| * @since 4.1.1 |
There was a problem hiding this comment.
I'm not why have you decided to use this version if we asked explicitly for 4.1
| streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) { | ||
| @Override | ||
| protected StreamsBuilder createInstance() { | ||
| return spy(super.createInstance()); |
There was a problem hiding this comment.
Why do you spy if you don't verify eventually?
| streamsBuilderFactoryBean.setGroupProtocol(testGroupProtocol); | ||
| streamsBuilderFactoryBean.afterPropertiesSet(); | ||
| StreamsBuilder builder = streamsBuilderFactoryBean.getObject(); | ||
| builder.stream("foo"); |
There was a problem hiding this comment.
Please, no unprofessional foo/bar language in new code.
I know there is a number of them around to be fixed one day, but that doesn't mean we should continue following such a bad habit.
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
Signed-off-by: jad837 <jadhavsaurabh037@gmail.com>
|
@jad837 , thank you for contribution; looking forward for more! |
This PR addresses the issue #4329. It exposes group protocol configuration for streams directly through
StreamsBuilderFactoryBeanThis PR does the following
Expose configuration for Group Protocols in
StreamsBuilderFactoryBean.Adds test for
StreamsBuilderFactoryBeanto test the new configuration.Questions
Do we want to keep raw property exposure for group protocol? This means that configuration inside factory bean can be null & if both are set then instance variable will override the raw properties configuration. We can add warning in such conditions.
Currently lowercase/uppercase values can be provided as "classic"/"streams", Do we want to add an assertion to check if values fall inside GroupProtocol enum? Currently values outside of enum will throw exception.
Closes #4329