Skip to content

GH-4322: Add RENEW acknowledgment type to ShareAcknowledgment#4340

Merged
artembilan merged 3 commits into
spring-projects:mainfrom
sobychacko:gh-4322
Mar 13, 2026
Merged

GH-4322: Add RENEW acknowledgment type to ShareAcknowledgment#4340
artembilan merged 3 commits into
spring-projects:mainfrom
sobychacko:gh-4322

Conversation

@sobychacko

Copy link
Copy Markdown
Contributor

Fixes: #4322

  • Add renew() to ShareAcknowledgment and document RENEW (KIP-1222) in Javadoc; implement in ShareConsumerAcknowledgment with AcknowledgeType.RENEW.

  • Allow RENEW then terminal ack: change acknowledgeInternal so RENEW is allowed when state is null or RENEW, and terminal (ACCEPT/RELEASE/REJECT) is allowed when state is null or RENEW; only one terminal ack per record.

  • Call onRecordAcknowledged only for terminal acks in processQueuedAcknowledgments so RENEW does not remove the record from pending/timestamp tracking.

  • Treat isAcknowledged() as terminally acknowledged (false after RENEW until acknowledge/release/reject). Update timeout warning to mention renew() and that a terminal ack is still required.

  • Add no-op renew() to NoOpShareAck in MessagingMessageListenerAdapter.

  • Add reference docs and integration tests.

…owledgment`

Fixes: spring-projects#4322

- Add `renew()` to `ShareAcknowledgment` and document RENEW (KIP-1222) in
  Javadoc; implement in `ShareConsumerAcknowledgment` with
  `AcknowledgeType.RENEW`.

- Allow RENEW then terminal ack: change `acknowledgeInternal` so RENEW
  is allowed when state is null or RENEW, and terminal (`ACCEPT/RELEASE/REJECT`)
  is allowed when state is null or RENEW; only one terminal ack per record.

- Call `onRecordAcknowledged` only for terminal acks in
  `processQueuedAcknowledgments` so RENEW does not remove the record from
  pending/timestamp tracking.

- Treat `isAcknowledged()` as terminally acknowledged (false after RENEW
  until acknowledge/release/reject). Update timeout warning to mention
  `renew()` and that a terminal ack is still required.

- Add no-op `renew()` to `NoOpShareAck` in `MessagingMessageListenerAdapter`.

- Add reference docs and integration tests.

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool!

Just couple nit-picks.

One more note, not related to the change.
Looks like that one could be as a record:

private record PendingAcknowledgment<K, V>(ConsumerRecord<K, V> record, AcknowledgeType type) {

	}


`ShareAcknowledgment` now supports `renew()` to extend the acquisition lock on a record when processing may exceed the broker's lock duration (`group.share.record.lock.duration.ms`, default 30 seconds).
This follows KIP-1222 (Kafka 4.2): RENEW is non-terminal.
You may call `renew()` multiple times, but you must still call exactly one of `acknowledge()`, `release()`, or `reject()` when processing completes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of these details would be better to have in the target chapter.
The whats-new is updated for every single major/minor version.
So, it is easy to lose it.

*
* @throws IllegalStateException if the record has already been terminally
* acknowledged (via {@link #acknowledge()}, {@link #release()}, or {@link #reject()})
* @since 4.0

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.1 ?

ack.acknowledge();

// Allow consumer thread to process queued acks (RENEW, RENEW, ACCEPT)
Thread.sleep(500);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is not robust solution.
Let's consider to use await() instead polling until that assertThat(isAcknowledgedInternal(ack)).isTrue() is OK.
See Awaitility feature untilAsserted().


ack.acknowledge();
// Second terminal ack must throw
assertThatExceptionOfType(IllegalStateException.class)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a dedicated, convenient assertThatIllegalStateException()

…ledgment` to record

- Shorten whats-new `ShareAcknowledgment.renew()` entry; keep details in kafka-queues.adoc
- Test improvements and cleanup
- Refactor `PendingAcknowledgment` to a Java record

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great update!
I like those comments in the code explaining the reason behind ack type behavior.

I have just one that minor code clean up question in the test.
Nothing critical: just wonder if we want make our code smarter.

Thanks

// Wait for consumer thread to process queued acks (RENEW, RENEW, ACCEPT)
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(isAcknowledgedInternal(ack)).isTrue();
assertThat(getAcknowledgmentTypeInternal(ack)).isEqualTo(AcknowledgeType.ACCEPT);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to put both assertion into await().
We just need to wait for the first one, and second won't be called until this await() has successed.

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
@artembilan artembilan enabled auto-merge (squash) March 13, 2026 18:05
@artembilan artembilan merged commit b0bfbe4 into spring-projects:main Mar 13, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add RENEW acknowledgment type to ShareAcknowledgment

2 participants