I have a Spring Boot 4.0.3 application. I am using a @KafkaListener method with a RecordFilterStrategy, setAckDiscarded(true) used in its container factory, and a CompletableFuture as returned value. I see that the listener pauses pretty much after consuming the first batch even though all messages appear to be either filtered or processed correctly. I looked through the code and, unless I'm missing something, this function has a problem:
|
private synchronized void ackInOrder(ConsumerRecord<K, V> cRecord) { |
Let's say my batch has 10 records from a single partition with offsets 0 through 9:
- Records with offsets 0 through 4 are processed in order, so
offsetsInThisBatch should contain [5, 6, 7, 8, 9]
- Records with offsets 8 and 9 finish next and are added to
deferredOffsets
- Record with offset 5 completes next and it's removed from
offsetsInThisBatch and this loop runs:
if (!CollectionUtils.isEmpty(deferred)) {
deferred.sort(Comparator.comparingLong(ConsumerRecord::offset));
while (!ObjectUtils.isEmpty(deferred)) {
recordToAck = deferred.remove(0);
offs.remove(0);
}
}
- The loop removes 6 and 7 from
offsetsInThisBatch as it empties deferredOffsets, so we end up with [8, 9] as pending offsets in offsetsInThisBatch
After this, regardless of whether record with offset 6 or 7 finishes next, their offset will never fulfill if (offs.get(0) == cRecord.offset()). In fact an IllegalStateException should be thrown, but since this happens in the lambda passed to completableFutureResult.whenComplete and that future isn't tracked anywhere, maybe it's swallowed? And since offsetsInThisBatch isn't empty, the consumer is paused.
I have a Spring Boot 4.0.3 application. I am using a
@KafkaListenermethod with aRecordFilterStrategy,setAckDiscarded(true)used in its container factory, and aCompletableFutureas returned value. I see that the listener pauses pretty much after consuming the first batch even though all messages appear to be either filtered or processed correctly. I looked through the code and, unless I'm missing something, this function has a problem:spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
Line 2174 in e9ff63e
Let's say my batch has 10 records from a single partition with offsets 0 through 9:
offsetsInThisBatchshould contain[5, 6, 7, 8, 9]deferredOffsetsoffsetsInThisBatchand this loop runs:offsetsInThisBatchas it emptiesdeferredOffsets, so we end up with[8, 9]as pending offsets inoffsetsInThisBatchAfter this, regardless of whether record with offset 6 or 7 finishes next, their offset will never fulfill
if (offs.get(0) == cRecord.offset()). In fact anIllegalStateExceptionshould be thrown, but since this happens in the lambda passed tocompletableFutureResult.whenCompleteand that future isn't tracked anywhere, maybe it's swallowed? And sinceoffsetsInThisBatchisn't empty, the consumer is paused.