Skip to content

BatchListenerFailedException silently commits offsets for unprocessed records in BatchMessageListener #4436

@rroesch1

Description

@rroesch1

Summary

When a BatchMessageListener throws a BatchListenerFailedException, Spring for Apache Kafka's DefaultErrorHandler commits the offsets of all records before the indicated failed record. This behavior is dangerous because it assumes that all preceding records have been fully and successfully processed — an assumption that does not hold in many legitimate BatchMessageListener use cases, leading to silent data loss.

Description

Current Behavior

When a BatchMessageListener throws a BatchListenerFailedException(message, cause, failedRecord), the DefaultErrorHandler:

  1. Identifies the index of the failed record in the batch.
  2. Commits offsets for all records before that index (treating them as successfully processed).
  3. Seeks back to the failed record for retry.

Why This Is Dangerous

The BatchMessageListener interface exists specifically for use cases where batch processing does not follow a simple sequential, one-record-at-a-time model. Common patterns include:

  1. Parallel processing: Records in the batch are processed concurrently (e.g., using virtual threads). If record N fails, records N+1, N+2, … may have succeeded, while records before N may still be in-flight or may have produced side effects that need to be rolled back.

  2. Multi-step pipelines: A batch listener may process records in multiple stages — e.g., transform, then produce to an output topic. If an error occurs during the produce step, the transform step for earlier records may have completed, but their output was never actually sent. Committing their offsets means those events are permanently lost.

  3. Reordered processing: The listener may deliberately reorder records (e.g., grouping by key for deduplication). The positional index of the failed ConsumerRecord in the original batch has no meaningful relationship to which records have actually been processed.

  4. Transactional processing: The listener may be doing work inside a transaction. If any record fails, the entire transaction should be rolled back — but Spring for Apache Kafka has already committed offsets for the "successful" records, making rollback impossible.

Concrete Example

Consider a BatchMessageListener that:

  1. Transforms each input record (Input → Stage 1 → Stage 2 → Output).
  2. Produces the transformed records to an output Kafka topic.
public void onMessage(List<ConsumerRecord<byte[], GenericRecord>> data) {
    // Step 1: Transform all records (parallel, using virtual threads)
    var results = transformAll(data);

    // Step 2: Produce all results to the output topic
    var futures = results.stream()
            .map(kafkaTemplate::send)
            .toArray(CompletableFuture[]::new);
    CompletableFuture.allOf(futures).join();
}

If transformAll() throws a BatchListenerFailedException pointing to record index 5:

  • Expected: No offsets are committed. The entire batch is retried.
  • Actual: Offsets for records 0–4 are committed, even though kafkaTemplate::send was never called for any of them. Those 5 events are silently and permanently lost.

This is not a theoretical concern — we encountered this exact bug in a production system in our organization, where a batch listener was processing records in parallel and threw BatchListenerFailedException on a failure, resulting in data loss for all preceding records.

Why This Should Not Be the Default

While this behavior is documented in the reference guide, it remains highly unexpected and error-prone in practice. The documentation describes the mechanism, but does not warn about the data-loss risk. A developer encountering a batch processing failure will naturally reach for BatchListenerFailedException — it is, after all, the only exception specifically designed for BatchMessageListener — without realizing that it triggers partial offset commits that silently discard unprocessed events.

Critically, the Javadoc of BatchListenerFailedException itself does not mention the offset-commit side effect at all. Developers who discover the exception via IDE auto-completion or Javadoc have no indication of this behavior.

The BatchMessageListener API makes no guarantees about processing order or completion semantics. The error handler should not make assumptions that contradict the flexibility the API provides.

Proposed Solutions

Option A: Change the Default Behavior (Preferred)

BatchListenerFailedException should not cause the DefaultErrorHandler to commit offsets for records before the failed index. Instead, the entire batch should be retried — the same behavior as throwing any other RuntimeException.

If the current "commit preceding offsets" behavior is desired by some users, it should be opt-in via:

  • A flag on DefaultErrorHandler (e.g., setCommitRecoveredOffsets(true)), or
  • A separate, explicitly named exception (e.g., PartialBatchFailedException).

Option B: Document the Behavior and Clarify Intent

At minimum, the Javadoc of BatchListenerFailedException must explicitly describe the offset-commit side effect. Currently, the Javadoc does not mention this behavior at all.

Proposed Javadoc for BatchListenerFailedException:

/**
 * Exception to be thrown by a {@link BatchMessageListener} to indicate that a
 * specific record within the batch has failed processing.
 *
 * <p><strong>Important:</strong> When this exception is handled by the
 * {@link org.springframework.kafka.listener.DefaultErrorHandler}, it will
 * <strong>commit the offsets of all records preceding the failed record</strong>
 * in the batch, treating them as successfully processed. The failed record
 * (and all subsequent records) will be seeked back for retry or recovery.
 *
 * <p>This means that throwing this exception <strong>implicitly asserts</strong>
 * that all records before the failed record have been <strong>fully and
 * irreversibly processed</strong>, including any side effects such as producing
 * to output topics, writing to databases, or calling external services.
 *
 * <p>If your {@link BatchMessageListener} processes records in parallel,
 * in a non-sequential order, in multiple steps, or within a transaction,
 * do <strong>not</strong> use this exception — throw a standard
 * {@link RuntimeException} instead to ensure the entire batch is retried
 * without committing any offsets.
 *
 * @see org.springframework.kafka.listener.DefaultErrorHandler
 * @see BatchMessageListener
 */

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions