Skip to content

Async Processing (and out of order commits) with FilteringMessageListenerAdapter #4377

@druvdub

Description

@druvdub

Expected Behavior

When using a RecordFilterStrategy with a @KafkaListener that returns asynchronously (e.g., CompletableFuture, Mono, etc.), filtered messages should still trigger acknowledgment so that consumer offsets advance correctly. Valid messages should be processed asynchronously, and completion should be able to happen out of order.

The listener container should defer offset commits until after asynchronous completion, even when RecordFilterStrategy is applied

Current Behavior

Currently, asynchronous @KafkaListener methods do not work correctly when a RecordFilterStrategy is used. This appears to be because the listener gets wrapped in a FilterMessageListenerAdapter, while the async handling logic only checks for MessagingMessageListenerAdapter.

Context

We use Spring Kafka to process messages between services over a shared topic.
Each message includes an eventType header, and we use that header to route messages to the appropriate @KafkaListener.

Each listener is configured through the ConcurrentKafkaListenerContainerFactory with a listener-specific argument that determines the expected event type for that consumer.

To avoid duplicating filtering logic, we use a RecordInterceptorAdapter to apply the RecordFilterStrategy before the listener processes the record.

Within each listener, the incoming message is submitted to an asynchronous thread pool for processing.

@Autowired
private AsyncThreadPoolExecutor kafkaExecutor;

@RetryableTopic(
        attempts = "3",
        backoff = @Backoff(delay = 1000, multiplier = 2.0)
)
@KafkaListener(
        topics = "my-topic",
        groupId = "my-group",
        containerFactory = "kafkaListenerContainerFactory"
)
public CompletableFuture<Void> listen(ConsumerRecord<String, String> record) {

    return CompletableFuture.runAsync(() -> process(record), kafkaExecutor);
}

The goal of using async return types is to avoid acknowledging and committing the offset too early, and to allow retry-topic handling to work automatically.


Issue 1: Async acknowledgment is not enabled automatically:
If we do not explicitly set asyncAcks = true and configure the container with AckMode.MANUAL or AckMode.MANUAL_IMMEDIATE, Spring does not switch to manual acknowledgment internally when the listener returns asynchronously.

As a result, the message is acknowledged immediately after being submitted to the thread pool, rather than after async processing completes.

ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();

factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAsyncAcks(true);

This is reflected in the logs and appears to be tied to the async-reply handling path in MessagingMessageListenerAdapter:

Without explicitly enabling async acks, the offset is committed too early.


Issue 2: Filtered records are not acknowledged if we do try to use RecordInterceptor

When we use a RecordInterceptorAdapter to apply the RecordFilterStrategy, records that are filtered out are not acknowledged.

This causes consumer lag to build up, because the container appears to treat those records as still in progress and waits indefinitely for them to be committed.

factory.setRecordInterceptor(new RecordInterceptorAdapter<>(record -> {
    String eventType = new String(record.headers()
        .lastHeader("eventType")
        .value());

    if (!expectedEventType.equals(eventType)) {
        return null;
    }

    return record;
}));

Because the filtered records are not acked, the offset does not advance and the consumer lags behind over time.

Important

Spring Kafka version: 3.3.7

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions