Skip to content

Add error handling strategy (poll-level and listener-level) to Share consumer container #4321

@sobychacko

Description

@sobychacko

The share consumer container (ShareKafkaMessageListenerContainer) has two error handling problems.

1. Poll-level exceptions terminate the consumer thread

The poll loop catches exceptions generically, logs it, sets exitThrowable, and breaks out of the loop:

catch (Exception e) {
    if (e.getCause() instanceof InterruptedException) {
        Thread.currentThread().interrupt();
    }
    this.logger.error(e, "Error in share consumer poll loop");
    exitThrowable = e;
    break;
}

RecordDeserializationException is thrown by poll(), not by the listener. A single record that fails deserialization terminates the entire consumer thread.

Catch RecordDeserializationException before the generic Exception catch. REJECT the affected record via consumer.acknowledge(record, AcknowledgeType.REJECT) using the topic-partition and offset from the exception, log a warning, and continue the poll loop. The important fix here is not killing the thread — the record itself is undeserializable and should be rejected.

2. Listener-level exceptions always REJECT

handleProcessingError unconditionally REJECTs the record in both modes:

  • Explicit mode: calls acknowledgment.reject()
  • Implicit mode: calls consumer.acknowledge(record, AcknowledgeType.REJECT)

There is no equivalent of the traditional container's CommonErrorHandler chain. All exceptions result in permanent rejection regardless of type.

Proposed changes

Poll-level fix:

Add a RecordDeserializationException catch in the poll loop before the generic Exception catch. Extract the topic-partition and offset from the exception, call consumer.acknowledge() with AcknowledgeType.REJECT, log a warning, and continue polling.

Listener-level pluggable error handling:

Introduce a ShareConsumerRecordRecoverer functional interface:

@FunctionalInterface
public interface ShareConsumerRecordRecoverer {
    AcknowledgeType recover(ConsumerRecord<?, ?> record, Exception exception);
}

The container calls this from handleProcessingError instead of hardcoding REJECT. The return value determines how the record is acknowledged.

Provide a DefaultShareConsumerRecordRecoverer with defaults:

  • CorruptRecordExceptionAcknowledgeType.REJECT (permanent, record is bad)
  • Everything else → AcknowledgeType.REJECT

Users can override this to RELEASE specific exception types where it makes sense for their use case (e.g., transient downstream failures).

Wire it through ContainerProperties so users can plug in their own implementation.

Note: CommonErrorHandler is not reusable here. Its methods take Consumer<?, ?> parameters, and ShareConsumer does not extend Consumer. It also has partition-oriented concepts (seeks, remaining records, partition assignment callbacks) that do not apply to share groups.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions