In what version(s) of Spring for Apache Kafka are you seeing this issue?
4.1.0-RC1
Describe the bug
When a processing or production exception is triggered, the key, value, and headers of the record passed to the KafkaStreamsDeadLetterDestinationResolver
correspond to the raw key, raw value, and headers of the source record at the input of the sub-topology. They should be the key, value, and headers of the record at the input of the processor that triggered the exception.
This happens because the source raw key, source raw value, and source headers are used to build the ConsumerRecord passed to the KafkaStreamsDeadLetterDestinationResolver: https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/streams/RecoveringProcessingExceptionHandler.java#L66
The issue does not occur with the deserialization exception handler, since the key, value, and headers are the same as the source.
To Reproduce
- Create a stream and configure the
RecoveringProcessingExceptionHandler.
- Create a topology that raises a processing exception:
@Bean
public KStream<?, ?> topology(StreamsBuilder streamsBuilder) {
KStream<String, String> stream =
streamsBuilder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));
stream.filter(
(_, _) -> {
throw new IllegalArgumentException("error");
})
.to("output-topic", Produced.with(Serdes.String(), Serdes.String()));
return stream;
}
- Define a minimal
KafkaStreamsDeadLetterDestinationResolver:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration configs(
KafkaProperties kafkaProperties, KafkaStreamsDeadLetterDestinationResolver resolver) {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildStreamsProperties());
props.put(RecoveringDeserializationExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
props.put(RecoveringProcessingExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
props.put(RecoveringProductionExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
return new KafkaStreamsConfiguration(props);
}
@Bean
public KafkaStreamsDeadLetterDestinationResolver resolver() {
return (context, record, exception) -> new TopicPartition("default-dlq-topic", 0);
}
- Trigger a processing exception by producing a message to the input topic.
- When invoking the configured
KafkaStreamsDeadLetterDestinationResolver, the key and value correspond to the serialized raw key and raw value of the source record, rather than the deserialized, typed key and value.
Expected behavior
The key, value, and headers should come from the record at the input of the processor that triggered the exception, rather than from the source record, so users can define the dead letter destination based on the input record type:
@Bean
public KafkaStreamsDeadLetterDestinationResolver resolver() {
return (context, record, exception) -> {
if (record.value() instanceof CustomObject customObject) {
return new TopicPartition("custom-dlq-topic", -1);
}
return new TopicPartition("default-dlq-topic", 0);
};
}
In what version(s) of Spring for Apache Kafka are you seeing this issue?
4.1.0-RC1
Describe the bug
When a processing or production exception is triggered, the key, value, and headers of the record passed to the KafkaStreamsDeadLetterDestinationResolver
correspond to the raw key, raw value, and headers of the source record at the input of the sub-topology. They should be the key, value, and headers of the record at the input of the processor that triggered the exception.
This happens because the source raw key, source raw value, and source headers are used to build the
ConsumerRecordpassed to theKafkaStreamsDeadLetterDestinationResolver: https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/streams/RecoveringProcessingExceptionHandler.java#L66The issue does not occur with the deserialization exception handler, since the key, value, and headers are the same as the source.
To Reproduce
RecoveringProcessingExceptionHandler.KafkaStreamsDeadLetterDestinationResolver:KafkaStreamsDeadLetterDestinationResolver, the key and value correspond to the serialized raw key and raw value of the source record, rather than the deserialized, typed key and value.Expected behavior
The key, value, and headers should come from the record at the input of the processor that triggered the exception, rather than from the source record, so users can define the dead letter destination based on the input record type: