Skip to content

Incorrect key, value, and headers passed to KafkaStreamsDeadLetterDestinationResolver #4430

@loicgreffier

Description

@loicgreffier

In what version(s) of Spring for Apache Kafka are you seeing this issue?

4.1.0-RC1

Describe the bug

When a processing or production exception is triggered, the key, value, and headers of the record passed to the KafkaStreamsDeadLetterDestinationResolver
correspond to the raw key, raw value, and headers of the source record at the input of the sub-topology. They should be the key, value, and headers of the record at the input of the processor that triggered the exception.

This happens because the source raw key, source raw value, and source headers are used to build the ConsumerRecord passed to the KafkaStreamsDeadLetterDestinationResolver: https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/streams/RecoveringProcessingExceptionHandler.java#L66

The issue does not occur with the deserialization exception handler, since the key, value, and headers are the same as the source.

To Reproduce

  1. Create a stream and configure the RecoveringProcessingExceptionHandler.
  2. Create a topology that raises a processing exception:
@Bean
public KStream<?, ?> topology(StreamsBuilder streamsBuilder) {
    KStream<String, String> stream =
            streamsBuilder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

    stream.filter(
                    (_, _) -> {
                        throw new IllegalArgumentException("error");
                    })
            .to("output-topic", Produced.with(Serdes.String(), Serdes.String()));

    return stream;
}
  1. Define a minimal KafkaStreamsDeadLetterDestinationResolver:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration configs(
        KafkaProperties kafkaProperties, KafkaStreamsDeadLetterDestinationResolver resolver) {
    Map<String, Object> props = new HashMap<>(kafkaProperties.buildStreamsProperties());
    props.put(RecoveringDeserializationExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
    props.put(RecoveringProcessingExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
    props.put(RecoveringProductionExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver);
    return new KafkaStreamsConfiguration(props);
}

@Bean
public KafkaStreamsDeadLetterDestinationResolver resolver() {
    return (context, record, exception) -> new TopicPartition("default-dlq-topic", 0);
}
  1. Trigger a processing exception by producing a message to the input topic.
  2. When invoking the configured KafkaStreamsDeadLetterDestinationResolver, the key and value correspond to the serialized raw key and raw value of the source record, rather than the deserialized, typed key and value.

Expected behavior

The key, value, and headers should come from the record at the input of the processor that triggered the exception, rather than from the source record, so users can define the dead letter destination based on the input record type:

@Bean
public KafkaStreamsDeadLetterDestinationResolver resolver() {
    return (context, record, exception) -> {
        if (record.value() instanceof CustomObject customObject) {
            return new TopicPartition("custom-dlq-topic", -1);
        }

        return new TopicPartition("default-dlq-topic", 0);
    };
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions