You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When using a RecordFilterStrategy with a @KafkaListener that returns asynchronously (e.g., CompletableFuture, Mono, etc.), filtered messages should still trigger acknowledgment so that consumer offsets advance correctly. Valid messages should be processed asynchronously, and completion should be able to happen out of order.
The listener container should defer offset commits until after asynchronous completion, even when RecordFilterStrategy is applied
Current Behavior
Currently, asynchronous @KafkaListener methods do not work correctly when a RecordFilterStrategy is used. This appears to be because the listener gets wrapped in a FilterMessageListenerAdapter, while the async handling logic only checks for MessagingMessageListenerAdapter.
Context
We use Spring Kafka to process messages between services over a shared topic.
Each message includes an eventType header, and we use that header to route messages to the appropriate @KafkaListener.
Each listener is configured through the ConcurrentKafkaListenerContainerFactory with a listener-specific argument that determines the expected event type for that consumer.
To avoid duplicating filtering logic, we use a RecordInterceptorAdapter to apply the RecordFilterStrategy before the listener processes the record.
Within each listener, the incoming message is submitted to an asynchronous thread pool for processing.
The goal of using async return types is to avoid acknowledging and committing the offset too early, and to allow retry-topic handling to work automatically.
Issue 1: Async acknowledgment is not enabled automatically:
If we do not explicitly set asyncAcks = true and configure the container with AckMode.MANUAL or AckMode.MANUAL_IMMEDIATE, Spring does not switch to manual acknowledgment internally when the listener returns asynchronously.
As a result, the message is acknowledged immediately after being submitted to the thread pool, rather than after async processing completes.
Without explicitly enabling async acks, the offset is committed too early.
Issue 2: Filtered records are not acknowledged if we do try to use RecordInterceptor
When we use a RecordInterceptorAdapter to apply the RecordFilterStrategy, records that are filtered out are not acknowledged.
This causes consumer lag to build up, because the container appears to treat those records as still in progress and waits indefinitely for them to be committed.
Expected Behavior
When using a
RecordFilterStrategywith a@KafkaListenerthat returns asynchronously (e.g.,CompletableFuture,Mono, etc.), filtered messages should still trigger acknowledgment so that consumer offsets advance correctly. Valid messages should be processed asynchronously, and completion should be able to happen out of order.The listener container should defer offset commits until after asynchronous completion, even when RecordFilterStrategy is applied
Current Behavior
Currently, asynchronous
@KafkaListenermethods do not work correctly when aRecordFilterStrategyis used. This appears to be because the listener gets wrapped in aFilterMessageListenerAdapter, while the async handling logic only checks forMessagingMessageListenerAdapter.Context
We use Spring Kafka to process messages between services over a shared topic.
Each message includes an
eventTypeheader, and we use that header to route messages to the appropriate@KafkaListener.Each listener is configured through the
ConcurrentKafkaListenerContainerFactorywith a listener-specific argument that determines the expected event type for that consumer.To avoid duplicating filtering logic, we use a
RecordInterceptorAdapterto apply theRecordFilterStrategybefore the listener processes the record.Within each listener, the incoming message is submitted to an asynchronous thread pool for processing.
The goal of using async return types is to avoid acknowledging and committing the offset too early, and to allow retry-topic handling to work automatically.
Issue 1: Async acknowledgment is not enabled automatically:
If we do not explicitly set
asyncAcks = trueand configure the container withAckMode.MANUALorAckMode.MANUAL_IMMEDIATE, Spring does not switch to manual acknowledgment internally when the listener returns asynchronously.As a result, the message is acknowledged immediately after being submitted to the thread pool, rather than after async processing completes.
This is reflected in the logs and appears to be tied to the async-reply handling path in
MessagingMessageListenerAdapter:Without explicitly enabling async acks, the offset is committed too early.
Issue 2: Filtered records are not acknowledged if we do try to use RecordInterceptor
When we use a
RecordInterceptorAdapterto apply theRecordFilterStrategy, records that are filtered out are not acknowledged.This causes consumer lag to build up, because the container appears to treat those records as still in progress and waits indefinitely for them to be committed.
Because the filtered records are not acked, the offset does not advance and the consumer lags behind over time.
Important
Spring Kafka version: 3.3.7