The share consumer container (ShareKafkaMessageListenerContainer) has two error handling problems.
1. Poll-level exceptions terminate the consumer thread
The poll loop catches exceptions generically, logs it, sets exitThrowable, and breaks out of the loop:
catch (Exception e) {
if (e.getCause() instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
this.logger.error(e, "Error in share consumer poll loop");
exitThrowable = e;
break;
}
RecordDeserializationException is thrown by poll(), not by the listener. A single record that fails deserialization terminates the entire consumer thread.
Catch RecordDeserializationException before the generic Exception catch. REJECT the affected record via consumer.acknowledge(record, AcknowledgeType.REJECT) using the topic-partition and offset from the exception, log a warning, and continue the poll loop. The important fix here is not killing the thread — the record itself is undeserializable and should be rejected.
2. Listener-level exceptions always REJECT
handleProcessingError unconditionally REJECTs the record in both modes:
- Explicit mode: calls
acknowledgment.reject()
- Implicit mode: calls
consumer.acknowledge(record, AcknowledgeType.REJECT)
There is no equivalent of the traditional container's CommonErrorHandler chain. All exceptions result in permanent rejection regardless of type.
Proposed changes
Poll-level fix:
Add a RecordDeserializationException catch in the poll loop before the generic Exception catch. Extract the topic-partition and offset from the exception, call consumer.acknowledge() with AcknowledgeType.REJECT, log a warning, and continue polling.
Listener-level pluggable error handling:
Introduce a ShareConsumerRecordRecoverer functional interface:
@FunctionalInterface
public interface ShareConsumerRecordRecoverer {
AcknowledgeType recover(ConsumerRecord<?, ?> record, Exception exception);
}
The container calls this from handleProcessingError instead of hardcoding REJECT. The return value determines how the record is acknowledged.
Provide a DefaultShareConsumerRecordRecoverer with defaults:
CorruptRecordException → AcknowledgeType.REJECT (permanent, record is bad)
- Everything else →
AcknowledgeType.REJECT
Users can override this to RELEASE specific exception types where it makes sense for their use case (e.g., transient downstream failures).
Wire it through ContainerProperties so users can plug in their own implementation.
Note: CommonErrorHandler is not reusable here. Its methods take Consumer<?, ?> parameters, and ShareConsumer does not extend Consumer. It also has partition-oriented concepts (seeks, remaining records, partition assignment callbacks) that do not apply to share groups.
The share consumer container (
ShareKafkaMessageListenerContainer) has two error handling problems.1. Poll-level exceptions terminate the consumer thread
The poll loop catches exceptions generically, logs it, sets
exitThrowable, and breaks out of the loop:RecordDeserializationExceptionis thrown bypoll(), not by the listener. A single record that fails deserialization terminates the entire consumer thread.Catch
RecordDeserializationExceptionbefore the genericExceptioncatch.REJECTthe affected record viaconsumer.acknowledge(record, AcknowledgeType.REJECT)using the topic-partition and offset from the exception, log a warning, and continue the poll loop. The important fix here is not killing the thread — the record itself is undeserializable and should be rejected.2. Listener-level exceptions always
REJECThandleProcessingErrorunconditionallyREJECTs the record in both modes:acknowledgment.reject()consumer.acknowledge(record, AcknowledgeType.REJECT)There is no equivalent of the traditional container's
CommonErrorHandlerchain. All exceptions result in permanent rejection regardless of type.Proposed changes
Poll-level fix:
Add a
RecordDeserializationExceptioncatch in the poll loop before the genericExceptioncatch. Extract the topic-partition and offset from the exception, callconsumer.acknowledge()withAcknowledgeType.REJECT, log a warning, and continue polling.Listener-level pluggable error handling:
Introduce a
ShareConsumerRecordRecovererfunctional interface:The container calls this from
handleProcessingErrorinstead of hardcodingREJECT. The return value determines how the record is acknowledged.Provide a
DefaultShareConsumerRecordRecovererwith defaults:CorruptRecordException→AcknowledgeType.REJECT(permanent, record is bad)AcknowledgeType.REJECTUsers can override this to
RELEASEspecific exception types where it makes sense for their use case (e.g., transient downstream failures).Wire it through
ContainerPropertiesso users can plug in their own implementation.Note:
CommonErrorHandleris not reusable here. Its methods takeConsumer<?, ?>parameters, andShareConsumerdoes not extendConsumer. It also has partition-oriented concepts (seeks, remaining records, partition assignment callbacks) that do not apply to share groups.