GH-4198: Fix async retry bookkeeping for Mono listener failures#4320
Conversation
9bd93da to
3e65a8e
Compare
artembilan
left a comment
There was a problem hiding this comment.
Very good!
Please, add your name to the @author list of the affected classes.
Thank you!
|
@artembilan Thanks for the review 😊 |
… failures - Keep async failure records queued when retry is in progress - Remove only retry-target offsets from async-ack bookkeeping - Add regression test for Mono listener direct-throw retry behavior Signed-off-by: MinChul-Son <smc5236@naver.com>
…e processing Signed-off-by: MinChul-Son <smc5236@naver.com>
Signed-off-by: MinChul-Son <smc5236@naver.com>
* Drop unnecessary synchronized from async offset cleanup helper * Reuse retry-record lists and remove redundant list copies * Align removeOffsetsInBatch signature with handleRemaining wildcard record type * Simplify retry listener assertion in AsyncMonoDefaultErrorHandlerRetryTests Signed-off-by: MinChul-Son <smc5236@naver.com>
* Use common RecordInRetryException try/catch in invokeBatchErrorHandler and invokeErrorHandler * Keep single-record retry list creation with List.of(cRecord) * Add removeOffsetsInBatchForRetryRecords to verify offsetsInThisBatch/deferredOffsets cleanup * Remove IDE-specific SpringJavaInjectionPointsAutowiringInspection suppressions from async mono retry test Signed-off-by: MinChul-Son <smc5236@naver.com>
|
@artembilan Please take another look when you have a moment. |
artembilan
left a comment
There was a problem hiding this comment.
Just couple questions in the test.
Thanks
Rename placeholder values, switch to typed mock() usage, and remove unnecessary poll sleep in removeOffsetsInBatchForRetryRecords(). Signed-off-by: MinChul-Son <smc5236@naver.com>
|
@artembilan I updated and applied request changes, Please re-review when you have time! |
|
Sry, I miss clicked |
- Use a snapshot-based pass for failed async records - simplify retry record initialization in invokeErrorHandler to avoid duplicate setup. - restore blank lines for readability Signed-off-by: MinChul-Son <smc5236@naver.com>
Signed-off-by: MinChul-Son <smc5236@naver.com>
artembilan
left a comment
There was a problem hiding this comment.
This is good now.
Will merge when build is green.
Thanks
|
Merged and back-ported. Thank you for contribution; looking forward for more! |
Fixes #4198
Summary
This PR fixes retry behavior when a @KafkaListener method has a Mono return type and throws an exception directly (before creating the async result).
Previously, with async acks enabled (default behavior for async return types), retry could stop after the first attempt because async-ack bookkeeping state could become inconsistent during RecordInRetryException handling.
This change keeps retry progressing while preserving unrelated async-ack state.
What was happening
In async failure paths, RecordInRetryException indicates the record should remain in retry flow.
However, container state handling around that path could lead to retry/pause issues.
Changes
Why this approach
A full async-ack state reset can affect unrelated in-flight records.
This PR removes only retry-target offsets to avoid collateral side effects while fixing the retry stall.
Tests
Validated with:
Scope
This PR addresses the issue path where the listener directly throws in a Mono return method.
It does not change semantics for Mono.error(...) completion-path handling.
Thanks for reviewing. I’m happy to adjust anything based on feedback.