Skip to content

assertTopic validation breaks meta-annotated @KafkaListener with programmatic topic resolution #4303

@mcebanupgrade

Description

@mcebanupgrade

Expected Behavior

@KafkaListener used as a meta-annotation on a custom annotation should allow topics to be resolved programmatically by a custom ConcurrentKafkaListenerContainerFactory — the same pattern that worked in Spring Kafka 3.3.11 and earlier.

Actual Behavior

Since Spring Kafka 3.3.12 (GH-4170 / PR #4172), KafkaListenerAnnotationBeanPostProcessor.assertTopic() requires exactly one of topics/topicPartitions/topicPattern to be set at annotation-processing time:

Assert.state(count == 1, "Only one of @Topic or @TopicPartition or @TopicPattern must be provided");

This fails with count == 0 when a custom annotation is meta-annotated with @KafkaListener that only specifies containerFactory (no topics), because topics are resolved programmatically by the container factory's createListenerContainer() method — which runs after the validation.

BeanCreationException: Error creating bean with name 'myEventHandler'
  Caused by: IllegalStateException: Only one of @Topic or @TopicPartition or @TopicPattern must be provided
    at o.s.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.assertTopic(KafkaListenerAnnotationBeanPostProcessor.java:876)

Steps to Reproduce

  1. Define a custom annotation meta-annotated with @KafkaListener:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener(containerFactory = "myContainerFactory")
public @interface MyEventListener {
    String id();
}
  1. Create a custom ConcurrentKafkaListenerContainerFactory that resolves topics programmatically:
public class MyContainerFactory extends ConcurrentKafkaListenerContainerFactory<String, Object> {
    @Override
    public ConcurrentMessageListenerContainer<String, Object> createListenerContainer(KafkaListenerEndpoint endpoint) {
        if (endpoint instanceof AbstractKafkaListenerEndpoint akle) {
            // Resolve topic from method parameter type, external config, etc.
            akle.setTopics(resolveTopicFromEndpoint(endpoint));
        }
        return super.createListenerContainer(endpoint);
    }
}
  1. Use the annotation on a handler method:
@Service
public class MyHandler {
    @MyEventListener(id = "my.handler")
    public void handle(MyEvent event) { ... }
}
  1. Upgrade to Spring Kafka 3.3.12+ → IllegalStateException at startup.

Suggested Fix

Change the validation from count == 1 (exactly one) to count <= 1 (at most one):

Assert.state(count <= 1, "Only one of @Topic or @TopicPartition or @TopicPattern must be provided");

This preserves the original intent of GH-4170 (preventing conflicting topic specifications like setting both topics and topicPartitions) while still allowing the legitimate case where topics are set to none in the annotation and resolved programmatically later.

The existing validation in AbstractKafkaListenerEndpoint.afterPropertiesSet() already covers the case where no topics are set at all — it throws IllegalStateException("At least one of topics, topicPartitions or topicPattern must be provided") after the container factory has had a chance to set them programmatically.

Version

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions