GH-4377: Fix asyncReplies calculation for container#4401
Merged
Conversation
Fixes: spring-projects#4377 The `KafkaMessageListenerContainer.ListenerConsumer` determines its `asyncReplies` against `listener instanceof AsyncRepliesAware`. However, in case of retryable topics configuration, or just `factory.setRecordFilterStrategy()`, the provided `listener` is going to be wrapped with a `DelegatingMessageListener` or even stack of them, e.g., in case of retryable topics and record filtering combination * Implement `AsyncRepliesAware` contract on the `DelegatingMessageListener` with simple delegation to the `getDelegate()` * There is not need in new tests, and modification of existing configuration, based on `CompletableFuture` and `Mono`, is enough to prove that new feature works as expected. Previously those tests enforced `AckMode.MANUAL` and `asyncAcks` through configuration to work out `asyncReplies` behavior. With the fix we got them through fallback according to the `isAsyncReplies()` state of the ultimate listener The `AsyncRepliesAware` contract goes down in the history to the `3.3.x` version. Therefore, applying this fix into that branch feels legit. Moreover, it is not a breaking change due to the `default` methods feature in Java internfaces. NOTE: The `asyncAcks` still needed to be **Auto-cherry-pick to `4.0.x` & `3.3.x`**
spring-builds
pushed a commit
that referenced
this pull request
Apr 18, 2026
* GH-4377: Fix `asyncReplies` calculation for container Fixes: #4377 The `KafkaMessageListenerContainer.ListenerConsumer` determines its `asyncReplies` against `listener instanceof AsyncRepliesAware`. However, in case of retryable topics configuration, or just `factory.setRecordFilterStrategy()`, the provided `listener` is going to be wrapped with a `DelegatingMessageListener` or even stack of them, e.g., in case of retryable topics and record filtering combination * Implement `AsyncRepliesAware` contract on the `DelegatingMessageListener` with simple delegation to the `getDelegate()` * There is not need in new tests, and modification of existing configuration, based on `CompletableFuture` and `Mono`, is enough to prove that new feature works as expected. Previously those tests enforced `AckMode.MANUAL` and `asyncAcks` through configuration to work out `asyncReplies` behavior. With the fix we got them through fallback according to the `isAsyncReplies()` state of the ultimate listener The `AsyncRepliesAware` contract goes down in the history to the `3.3.x` version. Therefore, applying this fix into that branch feels legit. Moreover, it is not a breaking change due to the `default` methods feature in Java interfaces. * Remove unused import from the AsyncMonoRetryTopicScenarioTests (cherry picked from commit c0d3c61)
spring-builds
pushed a commit
that referenced
this pull request
Apr 18, 2026
* GH-4377: Fix `asyncReplies` calculation for container Fixes: #4377 The `KafkaMessageListenerContainer.ListenerConsumer` determines its `asyncReplies` against `listener instanceof AsyncRepliesAware`. However, in case of retryable topics configuration, or just `factory.setRecordFilterStrategy()`, the provided `listener` is going to be wrapped with a `DelegatingMessageListener` or even stack of them, e.g., in case of retryable topics and record filtering combination * Implement `AsyncRepliesAware` contract on the `DelegatingMessageListener` with simple delegation to the `getDelegate()` * There is not need in new tests, and modification of existing configuration, based on `CompletableFuture` and `Mono`, is enough to prove that new feature works as expected. Previously those tests enforced `AckMode.MANUAL` and `asyncAcks` through configuration to work out `asyncReplies` behavior. With the fix we got them through fallback according to the `isAsyncReplies()` state of the ultimate listener The `AsyncRepliesAware` contract goes down in the history to the `3.3.x` version. Therefore, applying this fix into that branch feels legit. Moreover, it is not a breaking change due to the `default` methods feature in Java interfaces. * Remove unused import from the AsyncMonoRetryTopicScenarioTests (cherry picked from commit c0d3c61)
artembilan
added a commit
that referenced
this pull request
Apr 20, 2026
This reverts commit 24b026d.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes: #4377
The
KafkaMessageListenerContainer.ListenerConsumerdetermines itsasyncRepliesagainstlistener instanceof AsyncRepliesAware.However, in case of retryable topics configuration, or just
factory.setRecordFilterStrategy(), the providedlisteneris going to be wrapped with aDelegatingMessageListeneror even stack of them, e.g., in case of retryable topics and record filtering combinationAsyncRepliesAwarecontract on theDelegatingMessageListenerwith simple delegation to thegetDelegate()CompletableFutureandMono, is enough to prove that new feature works as expected. Previously those tests enforcedAckMode.MANUALandasyncAcksthrough configuration to work outasyncRepliesbehavior. With the fix we got them through fallback according to theisAsyncReplies()state of the ultimate listenerThe
AsyncRepliesAwarecontract goes down in the history to the3.3.xversion. Therefore, applying this fix into that branch feels legit. Moreover, it is not a breaking change due to thedefaultmethods feature in Java internfaces.NOTE: The
asyncAcksstill needed to beAuto-cherry-pick to
4.0.x&3.3.x