Skip to content

Observations for filtered messages are leaked, filling memory #4306

@andi5

Description

@andi5

In what version(s) of Spring for Apache Kafka are you seeing this issue?

4.0.2+, 3.3.12+

Describe the bug

When configuring a @KafkaListener with a RecordFilterStrategy via filter attribute, the observation is leaked in the metric spring.kafka.listener.active. This quickly fills available memory. Additionally, the message is not counted, but this may be correct though.

Image

This happened after updating spring-boot 3.5.9 to spring-boot 3.5.10. So we need a backport to 3.3.x for any fix.

The issue seems to be related to #4230. The FilteringMessageListenerAdapter instance in the ListenerConsumer.listener field is unwrapped to see the RecordMessagingMessageListenerAdapter delegate. Since the filtering adapter does not always call onMessage() on the delegate (it is filtering), no invoke() call errors or stops the current observation.

To Reproduce

Get https://github.com/andi5/spring-kafka-observation-leak and run tests with different versions of spring-boot (build.gradle). They all fail.

Expected behavior

No memory leak, the active listeners should get back to zero without load.

Sample

https://github.com/andi5/spring-kafka-observation-leak

Reports that include a sample will take priority over reports that do not.
At times, we may require a sample, so it is good to try and include a sample up front.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions