Skip to content

Listener with async acks pauses indefinitely #4332

@asardaes

Description

@asardaes

I have a Spring Boot 4.0.3 application. I am using a @KafkaListener method with a RecordFilterStrategy, setAckDiscarded(true) used in its container factory, and a CompletableFuture as returned value. I see that the listener pauses pretty much after consuming the first batch even though all messages appear to be either filtered or processed correctly. I looked through the code and, unless I'm missing something, this function has a problem:

private synchronized void ackInOrder(ConsumerRecord<K, V> cRecord) {

Let's say my batch has 10 records from a single partition with offsets 0 through 9:

  1. Records with offsets 0 through 4 are processed in order, so offsetsInThisBatch should contain [5, 6, 7, 8, 9]
  2. Records with offsets 8 and 9 finish next and are added to deferredOffsets
  3. Record with offset 5 completes next and it's removed from offsetsInThisBatch and this loop runs:
if (!CollectionUtils.isEmpty(deferred)) {
	deferred.sort(Comparator.comparingLong(ConsumerRecord::offset));
	while (!ObjectUtils.isEmpty(deferred)) {
		recordToAck = deferred.remove(0);
		offs.remove(0);
	}
}
  1. The loop removes 6 and 7 from offsetsInThisBatch as it empties deferredOffsets, so we end up with [8, 9] as pending offsets in offsetsInThisBatch

After this, regardless of whether record with offset 6 or 7 finishes next, their offset will never fulfill if (offs.get(0) == cRecord.offset()). In fact an IllegalStateException should be thrown, but since this happens in the lambda passed to completableFutureResult.whenComplete and that future isn't tracked anywhere, maybe it's swallowed? And since offsetsInThisBatch isn't empty, the consumer is paused.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions