Skip to content

GH-4170 : Add KafkaListener Validation (Allow @Topic or @TopicPartition)#4172

Merged
sobychacko merged 2 commits into
spring-projects:mainfrom
moonyoungCHAE:issue-4170
Dec 3, 2025
Merged

GH-4170 : Add KafkaListener Validation (Allow @Topic or @TopicPartition)#4172
sobychacko merged 2 commits into
spring-projects:mainfrom
moonyoungCHAE:issue-4170

Conversation

@moonyoungCHAE

Copy link
Copy Markdown
Contributor
  • As-is
    • When a @KafkaListener is created with both @Topic and @topicpartition, the listener consumes messages based on @topicpartition.
    • When @RetryableTopic is added, retry topics are managed based on @Topic.
  • To-be
    • Creating a @KafkaListener with both @Topic and @topicpartition is not allowed.

fixes #4170

Signed-off-by: moonyoungCHAE <xpf_fl@naver.com>
@sobychacko

Copy link
Copy Markdown
Contributor

@moonyoungCHAE There are build failures on the PR - Are you able to see this? https://github.com/spring-projects/spring-kafka/actions/runs/19523082207/job/55890231401?pr=4172.

Can you also trace, what happens to the retryable topic when we only provide TopicPartition on the KafkaListener? Since there are no topics given any more on KafkaListener (based on this PR), will it still create destinations for retry?

Thanks!

Signed-off-by: moonyoungCHAE <xpf_fl@naver.com>
@moonyoungCHAE

Copy link
Copy Markdown
Contributor Author

@sobychacko
Hi, I updated the PR to allow using one of @Topic, @topicpartition, or @TopicPattern. Previously, only @Topic and @topicpartition were allowed.

I confirmed that when only @topicpartition is provided, it still creates a destination for retry. The retry topic is determined here, which has not changed.

I traced the behavior, and the results are below.

    @RetryableTopic(attempts = "3", backoff = @Backoff(delay = 1000, multiplier = 2), topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
    @KafkaListener(id = "foo",
            clientIdPrefix = "myClientId",
            topicPartitions =
                    {
                            @TopicPartition(topic = "topic1-2", partitions = {"0"}),
                            @TopicPartition(topic = "topic2-2", partitions = {"0"})
                    }
                    )
    public void listen(String data) {

    }

2025-12-02T14:54:57.229+09:00  INFO 4341 --- [o-retry-1-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-retry-1-0, groupId=foo-retry-1] Resetting offset for partition topic1-2-retry-1-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
2025-12-02T14:54:57.229+09:00  INFO 4341 --- [o-retry-1-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-retry-1-0, groupId=foo-retry-1] Resetting offset for partition topic2-2-retry-1-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
2025-12-02T14:54:57.231+09:00  INFO 4341 --- [o-retry-0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-retry-0-0, groupId=foo-retry-0] Resetting offset for partition topic2-2-retry-0-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
2025-12-02T14:54:57.232+09:00  INFO 4341 --- [o-retry-0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-retry-0-0, groupId=foo-retry-0] Resetting offset for partition topic1-2-retry-0-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}


2025-12-02T14:54:57.232+09:00  INFO 4341 --- [  foo-dlt-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-dlt-0, groupId=foo-dlt] Resetting offset for partition topic2-2-dlt-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
2025-12-02T14:54:57.232+09:00  INFO 4341 --- [  foo-dlt-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=myClientId-dlt-0, groupId=foo-dlt] Resetting offset for partition topic1-2-dlt-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.

Thanks!

@sobychacko sobychacko merged commit a0b5b70 into spring-projects:main Dec 3, 2025
3 checks passed
spring-builds pushed a commit that referenced this pull request Dec 3, 2025
…) (#4172)

Fixes #4170

* add topic validation of kafka listener

* refactor: allow topic pattern for validtion

Signed-off-by: moonyoungCHAE <xpf_fl@naver.com>
(cherry picked from commit a0b5b70)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

@KafkaListener with Both topics and topicPartitions Breaks @RetryableTopic

2 participants