GH-4322: Add RENEW acknowledgment type to ShareAcknowledgment#4340
Conversation
…owledgment` Fixes: spring-projects#4322 - Add `renew()` to `ShareAcknowledgment` and document RENEW (KIP-1222) in Javadoc; implement in `ShareConsumerAcknowledgment` with `AcknowledgeType.RENEW`. - Allow RENEW then terminal ack: change `acknowledgeInternal` so RENEW is allowed when state is null or RENEW, and terminal (`ACCEPT/RELEASE/REJECT`) is allowed when state is null or RENEW; only one terminal ack per record. - Call `onRecordAcknowledged` only for terminal acks in `processQueuedAcknowledgments` so RENEW does not remove the record from pending/timestamp tracking. - Treat `isAcknowledged()` as terminally acknowledged (false after RENEW until acknowledge/release/reject). Update timeout warning to mention `renew()` and that a terminal ack is still required. - Add no-op `renew()` to `NoOpShareAck` in `MessagingMessageListenerAdapter`. - Add reference docs and integration tests. Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
artembilan
left a comment
There was a problem hiding this comment.
Very cool!
Just couple nit-picks.
One more note, not related to the change.
Looks like that one could be as a record:
private record PendingAcknowledgment<K, V>(ConsumerRecord<K, V> record, AcknowledgeType type) {
}
|
|
||
| `ShareAcknowledgment` now supports `renew()` to extend the acquisition lock on a record when processing may exceed the broker's lock duration (`group.share.record.lock.duration.ms`, default 30 seconds). | ||
| This follows KIP-1222 (Kafka 4.2): RENEW is non-terminal. | ||
| You may call `renew()` multiple times, but you must still call exactly one of `acknowledge()`, `release()`, or `reject()` when processing completes. |
There was a problem hiding this comment.
I think all of these details would be better to have in the target chapter.
The whats-new is updated for every single major/minor version.
So, it is easy to lose it.
| * | ||
| * @throws IllegalStateException if the record has already been terminally | ||
| * acknowledged (via {@link #acknowledge()}, {@link #release()}, or {@link #reject()}) | ||
| * @since 4.0 |
| ack.acknowledge(); | ||
|
|
||
| // Allow consumer thread to process queued acks (RENEW, RENEW, ACCEPT) | ||
| Thread.sleep(500); |
There was a problem hiding this comment.
i think this is not robust solution.
Let's consider to use await() instead polling until that assertThat(isAcknowledgedInternal(ack)).isTrue() is OK.
See Awaitility feature untilAsserted().
|
|
||
| ack.acknowledge(); | ||
| // Second terminal ack must throw | ||
| assertThatExceptionOfType(IllegalStateException.class) |
There was a problem hiding this comment.
There is a dedicated, convenient assertThatIllegalStateException()
…ledgment` to record - Shorten whats-new `ShareAcknowledgment.renew()` entry; keep details in kafka-queues.adoc - Test improvements and cleanup - Refactor `PendingAcknowledgment` to a Java record Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
artembilan
left a comment
There was a problem hiding this comment.
Great update!
I like those comments in the code explaining the reason behind ack type behavior.
I have just one that minor code clean up question in the test.
Nothing critical: just wonder if we want make our code smarter.
Thanks
| // Wait for consumer thread to process queued acks (RENEW, RENEW, ACCEPT) | ||
| Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { | ||
| assertThat(isAcknowledgedInternal(ack)).isTrue(); | ||
| assertThat(getAcknowledgmentTypeInternal(ack)).isEqualTo(AcknowledgeType.ACCEPT); |
There was a problem hiding this comment.
I don't think we need to put both assertion into await().
We just need to wait for the first one, and second won't be called until this await() has successed.
Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
Fixes: #4322
Add
renew()toShareAcknowledgmentand document RENEW (KIP-1222) in Javadoc; implement inShareConsumerAcknowledgmentwithAcknowledgeType.RENEW.Allow RENEW then terminal ack: change
acknowledgeInternalso RENEW is allowed when state is null or RENEW, and terminal (ACCEPT/RELEASE/REJECT) is allowed when state is null or RENEW; only one terminal ack per record.Call
onRecordAcknowledgedonly for terminal acks inprocessQueuedAcknowledgmentsso RENEW does not remove the record from pending/timestamp tracking.Treat
isAcknowledged()as terminally acknowledged (false after RENEW until acknowledge/release/reject). Update timeout warning to mentionrenew()and that a terminal ack is still required.Add no-op
renew()toNoOpShareAckinMessagingMessageListenerAdapter.Add reference docs and integration tests.