In what version(s) of Spring for Apache Kafka are you seeing this issue?
4.0.2+, 3.3.12+
Describe the bug
When configuring a @KafkaListener with a RecordFilterStrategy via filter attribute, the observation is leaked in the metric spring.kafka.listener.active. This quickly fills available memory. Additionally, the message is not counted, but this may be correct though.
This happened after updating spring-boot 3.5.9 to spring-boot 3.5.10. So we need a backport to 3.3.x for any fix.
The issue seems to be related to #4230. The FilteringMessageListenerAdapter instance in the ListenerConsumer.listener field is unwrapped to see the RecordMessagingMessageListenerAdapter delegate. Since the filtering adapter does not always call onMessage() on the delegate (it is filtering), no invoke() call errors or stops the current observation.
To Reproduce
Get https://github.com/andi5/spring-kafka-observation-leak and run tests with different versions of spring-boot (build.gradle). They all fail.
Expected behavior
No memory leak, the active listeners should get back to zero without load.
Sample
https://github.com/andi5/spring-kafka-observation-leak
Reports that include a sample will take priority over reports that do not.
At times, we may require a sample, so it is good to try and include a sample up front.
In what version(s) of Spring for Apache Kafka are you seeing this issue?
4.0.2+, 3.3.12+
Describe the bug
When configuring a
@KafkaListenerwith aRecordFilterStrategyviafilterattribute, the observation is leaked in the metricspring.kafka.listener.active. This quickly fills available memory. Additionally, the message is not counted, but this may be correct though.This happened after updating spring-boot 3.5.9 to spring-boot 3.5.10. So we need a backport to 3.3.x for any fix.
The issue seems to be related to #4230. The
FilteringMessageListenerAdapterinstance in theListenerConsumer.listenerfield is unwrapped to see theRecordMessagingMessageListenerAdapterdelegate. Since the filtering adapter does not always callonMessage()on the delegate (it is filtering), noinvoke()call errors or stops the current observation.To Reproduce
Get https://github.com/andi5/spring-kafka-observation-leak and run tests with different versions of spring-boot (build.gradle). They all fail.
Expected behavior
No memory leak, the active listeners should get back to zero without load.
Sample
https://github.com/andi5/spring-kafka-observation-leak
Reports that include a sample will take priority over reports that do not.
At times, we may require a sample, so it is good to try and include a sample up front.