Expected Behavior
@KafkaListener used as a meta-annotation on a custom annotation should allow topics to be resolved programmatically by a custom ConcurrentKafkaListenerContainerFactory — the same pattern that worked in Spring Kafka 3.3.11 and earlier.
Actual Behavior
Since Spring Kafka 3.3.12 (GH-4170 / PR #4172), KafkaListenerAnnotationBeanPostProcessor.assertTopic() requires exactly one of topics/topicPartitions/topicPattern to be set at annotation-processing time:
Assert.state(count == 1, "Only one of @Topic or @TopicPartition or @TopicPattern must be provided");
This fails with count == 0 when a custom annotation is meta-annotated with @KafkaListener that only specifies containerFactory (no topics), because topics are resolved programmatically by the container factory's createListenerContainer() method — which runs after the validation.
BeanCreationException: Error creating bean with name 'myEventHandler'
Caused by: IllegalStateException: Only one of @Topic or @TopicPartition or @TopicPattern must be provided
at o.s.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.assertTopic(KafkaListenerAnnotationBeanPostProcessor.java:876)
Steps to Reproduce
- Define a custom annotation meta-annotated with
@KafkaListener:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener(containerFactory = "myContainerFactory")
public @interface MyEventListener {
String id();
}
- Create a custom
ConcurrentKafkaListenerContainerFactory that resolves topics programmatically:
public class MyContainerFactory extends ConcurrentKafkaListenerContainerFactory<String, Object> {
@Override
public ConcurrentMessageListenerContainer<String, Object> createListenerContainer(KafkaListenerEndpoint endpoint) {
if (endpoint instanceof AbstractKafkaListenerEndpoint akle) {
// Resolve topic from method parameter type, external config, etc.
akle.setTopics(resolveTopicFromEndpoint(endpoint));
}
return super.createListenerContainer(endpoint);
}
}
- Use the annotation on a handler method:
@Service
public class MyHandler {
@MyEventListener(id = "my.handler")
public void handle(MyEvent event) { ... }
}
- Upgrade to Spring Kafka 3.3.12+ →
IllegalStateException at startup.
Suggested Fix
Change the validation from count == 1 (exactly one) to count <= 1 (at most one):
Assert.state(count <= 1, "Only one of @Topic or @TopicPartition or @TopicPattern must be provided");
This preserves the original intent of GH-4170 (preventing conflicting topic specifications like setting both topics and topicPartitions) while still allowing the legitimate case where topics are set to none in the annotation and resolved programmatically later.
The existing validation in AbstractKafkaListenerEndpoint.afterPropertiesSet() already covers the case where no topics are set at all — it throws IllegalStateException("At least one of topics, topicPartitions or topicPattern must be provided") after the container factory has had a chance to set them programmatically.
Version
Expected Behavior
@KafkaListenerused as a meta-annotation on a custom annotation should allow topics to be resolved programmatically by a customConcurrentKafkaListenerContainerFactory— the same pattern that worked in Spring Kafka 3.3.11 and earlier.Actual Behavior
Since Spring Kafka 3.3.12 (GH-4170 / PR #4172),
KafkaListenerAnnotationBeanPostProcessor.assertTopic()requires exactly one oftopics/topicPartitions/topicPatternto be set at annotation-processing time:This fails with
count == 0when a custom annotation is meta-annotated with@KafkaListenerthat only specifiescontainerFactory(no topics), because topics are resolved programmatically by the container factory'screateListenerContainer()method — which runs after the validation.Steps to Reproduce
@KafkaListener:ConcurrentKafkaListenerContainerFactorythat resolves topics programmatically:IllegalStateExceptionat startup.Suggested Fix
Change the validation from
count == 1(exactly one) tocount <= 1(at most one):This preserves the original intent of GH-4170 (preventing conflicting topic specifications like setting both
topicsandtopicPartitions) while still allowing the legitimate case where topics are set to none in the annotation and resolved programmatically later.The existing validation in
AbstractKafkaListenerEndpoint.afterPropertiesSet()already covers the case where no topics are set at all — it throwsIllegalStateException("At least one of topics, topicPartitions or topicPattern must be provided")after the container factory has had a chance to set them programmatically.Version