Skip to content

GH-4198: Fix async retry bookkeeping for Mono listener failures#4320

Merged
artembilan merged 8 commits into
spring-projects:mainfrom
MinChul-Son:GH-4198
Mar 9, 2026
Merged

GH-4198: Fix async retry bookkeeping for Mono listener failures#4320
artembilan merged 8 commits into
spring-projects:mainfrom
MinChul-Son:GH-4198

Conversation

@MinChul-Son

@MinChul-Son MinChul-Son commented Mar 4, 2026

Copy link
Copy Markdown
Contributor

Fixes #4198

Summary

This PR fixes retry behavior when a @KafkaListener method has a Mono return type and throws an exception directly (before creating the async result).

Previously, with async acks enabled (default behavior for async return types), retry could stop after the first attempt because async-ack bookkeeping state could become inconsistent during RecordInRetryException handling.

This change keeps retry progressing while preserving unrelated async-ack state.

What was happening

In async failure paths, RecordInRetryException indicates the record should remain in retry flow.
However, container state handling around that path could lead to retry/pause issues.

Changes

  • Keep async failed records queued when retry is still in progress.
  • On RecordInRetryException, clean async-ack bookkeeping only for retry-target records (instead of clearing all tracked async offsets).
  • Apply this behavior consistently for:
    • single-record async failure handling
    • record listener error handling path
    • batch error handling path
  • Add regression test:
    • AsyncMonoDefaultErrorHandlerRetryTests
    • verifies Mono listener direct-throw + DefaultErrorHandler retry behavior

Why this approach

A full async-ack state reset can affect unrelated in-flight records.
This PR removes only retry-target offsets to avoid collateral side effects while fixing the retry stall.

Tests

Validated with:

  • AsyncMonoDefaultErrorHandlerRetryTests
  • AsyncListenerTests
  • ObservationTests.asyncRetryScopePropagation
  • AsyncAckAfterHandleTests
  • PauseContainerWhileErrorHandlerIsRetryingTests

Scope

This PR addresses the issue path where the listener directly throws in a Mono return method.
It does not change semantics for Mono.error(...) completion-path handling.

Thanks for reviewing. I’m happy to adjust anything based on feedback.

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good!
Please, add your name to the @author list of the affected classes.

Thank you!

@MinChul-Son

MinChul-Son commented Mar 6, 2026

Copy link
Copy Markdown
Contributor Author

@artembilan Thanks for the review 😊
I’ve applied the requested changes and updated the implementation.
Could you please re-review when you have time?

… failures

- Keep async failure records queued when retry is in progress
- Remove only retry-target offsets from async-ack bookkeeping
- Add regression test for Mono listener direct-throw retry behavior

Signed-off-by: MinChul-Son <smc5236@naver.com>
…e processing

Signed-off-by: MinChul-Son <smc5236@naver.com>
Signed-off-by: MinChul-Son <smc5236@naver.com>
* Drop unnecessary synchronized from async offset cleanup helper
* Reuse retry-record lists and remove redundant list copies
* Align removeOffsetsInBatch signature with handleRemaining wildcard record type
* Simplify retry listener assertion in AsyncMonoDefaultErrorHandlerRetryTests

Signed-off-by: MinChul-Son <smc5236@naver.com>
* Use common RecordInRetryException try/catch in invokeBatchErrorHandler and invokeErrorHandler
* Keep single-record retry list creation with List.of(cRecord)
* Add removeOffsetsInBatchForRetryRecords to verify offsetsInThisBatch/deferredOffsets cleanup
* Remove IDE-specific SpringJavaInjectionPointsAutowiringInspection suppressions from async mono retry test

Signed-off-by: MinChul-Son <smc5236@naver.com>
@MinChul-Son

Copy link
Copy Markdown
Contributor Author

@artembilan Please take another look when you have a moment.
I applied the requested updates and would appreciate a re-review. Many Thanks 👍

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just couple questions in the test.
Thanks

Rename placeholder values, switch to typed mock() usage, and remove unnecessary poll sleep in removeOffsetsInBatchForRetryRecords().

Signed-off-by: MinChul-Son <smc5236@naver.com>
@MinChul-Son

Copy link
Copy Markdown
Contributor Author

@artembilan I updated and applied request changes, Please re-review when you have time!
Thanks

@MinChul-Son MinChul-Son closed this Mar 7, 2026
@MinChul-Son MinChul-Son reopened this Mar 7, 2026
@MinChul-Son

Copy link
Copy Markdown
Contributor Author

Sry, I miss clicked close with comment instead comment button..

- Use a snapshot-based pass for failed async records
- simplify retry record initialization in invokeErrorHandler to avoid duplicate setup.
- restore blank lines for readability

Signed-off-by: MinChul-Son <smc5236@naver.com>
Signed-off-by: MinChul-Son <smc5236@naver.com>

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good now.
Will merge when build is green.

Thanks

@artembilan artembilan merged commit 19070c3 into spring-projects:main Mar 9, 2026
3 checks passed
@artembilan

Copy link
Copy Markdown
Member

Merged and back-ported.

Thank you for contribution; looking forward for more!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

With a @KafkaListener that returns a Mono, the error handler does not retry

2 participants