Skip to content

GH-4377: Fix asyncReplies calculation for container#4401

Merged
sobychacko merged 2 commits into
spring-projects:mainfrom
artembilan:GH-4377
Apr 18, 2026
Merged

GH-4377: Fix asyncReplies calculation for container#4401
sobychacko merged 2 commits into
spring-projects:mainfrom
artembilan:GH-4377

Conversation

@artembilan

Copy link
Copy Markdown
Member

Fixes: #4377

The KafkaMessageListenerContainer.ListenerConsumer determines its asyncReplies against listener instanceof AsyncRepliesAware.
However, in case of retryable topics configuration, or just factory.setRecordFilterStrategy(), the provided listener is going to be wrapped with a DelegatingMessageListener or even stack of them, e.g., in case of retryable topics and record filtering combination

  • Implement AsyncRepliesAware contract on the DelegatingMessageListener with simple delegation to the getDelegate()
  • There is not need in new tests, and modification of existing configuration, based on CompletableFuture and Mono, is enough to prove that new feature works as expected. Previously those tests enforced AckMode.MANUAL and asyncAcks through configuration to work out asyncReplies behavior. With the fix we got them through fallback according to the isAsyncReplies() state of the ultimate listener

The AsyncRepliesAware contract goes down in the history to the 3.3.x version. Therefore, applying this fix into that branch feels legit. Moreover, it is not a breaking change due to the default methods feature in Java internfaces.

NOTE: The asyncAcks still needed to be

Auto-cherry-pick to 4.0.x & 3.3.x

Fixes: spring-projects#4377

The `KafkaMessageListenerContainer.ListenerConsumer` determines its `asyncReplies`
against `listener instanceof AsyncRepliesAware`.
However, in case of retryable topics configuration, or just `factory.setRecordFilterStrategy()`,
the provided `listener` is going to be wrapped with a `DelegatingMessageListener`
or even stack of them, e.g., in case of retryable topics and record filtering combination

* Implement `AsyncRepliesAware` contract on the `DelegatingMessageListener`
with simple delegation to the `getDelegate()`
* There is not need in new tests, and modification of existing configuration,
 based on `CompletableFuture` and `Mono`, is enough to prove that new feature works as expected.
Previously those tests enforced `AckMode.MANUAL` and `asyncAcks` through configuration to work out `asyncReplies` behavior.
With the fix we got them through fallback according to the `isAsyncReplies()` state of the
ultimate listener

The `AsyncRepliesAware` contract goes down in the history to the `3.3.x` version.
Therefore, applying this fix into that branch feels legit.
Moreover, it is not a breaking change due to the `default` methods feature in Java internfaces.

NOTE: The `asyncAcks` still needed to be

**Auto-cherry-pick to `4.0.x` & `3.3.x`**
@artembilan artembilan requested a review from sobychacko April 17, 2026 17:33
@sobychacko sobychacko merged commit c0d3c61 into spring-projects:main Apr 18, 2026
3 checks passed
spring-builds pushed a commit that referenced this pull request Apr 18, 2026
* GH-4377: Fix `asyncReplies` calculation for container

Fixes: #4377

The `KafkaMessageListenerContainer.ListenerConsumer` determines its `asyncReplies`
against `listener instanceof AsyncRepliesAware`.
However, in case of retryable topics configuration, or just `factory.setRecordFilterStrategy()`,
the provided `listener` is going to be wrapped with a `DelegatingMessageListener`
or even stack of them, e.g., in case of retryable topics and record filtering combination

* Implement `AsyncRepliesAware` contract on the `DelegatingMessageListener`
with simple delegation to the `getDelegate()`
* There is not need in new tests, and modification of existing configuration,
 based on `CompletableFuture` and `Mono`, is enough to prove that new feature works as expected.
Previously those tests enforced `AckMode.MANUAL` and `asyncAcks` through configuration to work out `asyncReplies` behavior.
With the fix we got them through fallback according to the `isAsyncReplies()` state of the
ultimate listener

The `AsyncRepliesAware` contract goes down in the history to the `3.3.x` version.
Therefore, applying this fix into that branch feels legit.
Moreover, it is not a breaking change due to the `default` methods feature in Java interfaces.

* Remove unused import from the AsyncMonoRetryTopicScenarioTests

(cherry picked from commit c0d3c61)
spring-builds pushed a commit that referenced this pull request Apr 18, 2026
* GH-4377: Fix `asyncReplies` calculation for container

Fixes: #4377

The `KafkaMessageListenerContainer.ListenerConsumer` determines its `asyncReplies`
against `listener instanceof AsyncRepliesAware`.
However, in case of retryable topics configuration, or just `factory.setRecordFilterStrategy()`,
the provided `listener` is going to be wrapped with a `DelegatingMessageListener`
or even stack of them, e.g., in case of retryable topics and record filtering combination

* Implement `AsyncRepliesAware` contract on the `DelegatingMessageListener`
with simple delegation to the `getDelegate()`
* There is not need in new tests, and modification of existing configuration,
 based on `CompletableFuture` and `Mono`, is enough to prove that new feature works as expected.
Previously those tests enforced `AckMode.MANUAL` and `asyncAcks` through configuration to work out `asyncReplies` behavior.
With the fix we got them through fallback according to the `isAsyncReplies()` state of the
ultimate listener

The `AsyncRepliesAware` contract goes down in the history to the `3.3.x` version.
Therefore, applying this fix into that branch feels legit.
Moreover, it is not a breaking change due to the `default` methods feature in Java interfaces.

* Remove unused import from the AsyncMonoRetryTopicScenarioTests

(cherry picked from commit c0d3c61)
artembilan added a commit that referenced this pull request Apr 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Async Processing (and out of order commits) with FilteringMessageListenerAdapter

2 participants