Introduce ShareAckMode enum for share consumers#4376
Conversation
|
The 4.0
|
The 4.0 boolean `explicitShareAcknowledgment` only controlled whether the listener received a non-null acknowledgment object. It had no effect on Kafka's `share.acknowledgement.mode`, so `acknowledge()` would throw `IllegalStateException` when the factory used the Kafka default (implicit mode). `ContainerProperties.ShareAckMode` (EXPLICIT, MANUAL, IMPLICIT) makes the modes explicit, with EXPLICIT as the default — the same philosophy as regular containers disabling auto.commit and owning the commit lifecycle themselves. For EXPLICIT and MANUAL the container now enforces `share.acknowledgement.mode=explicit` via consumer override properties, regardless of factory configuration. Other related fixes: - `RecordDeserializationException` handler now guards `acknowledge()` against IMPLICIT mode - `createShareConsumer` variant added to `ShareConsumerFactory` for consumer-level overrides - 4.0 dead code and naming issues cleaned up Deprecated methods keep the 4.0 API intact. Reference docs and What's New include a migration guide. Adding tests to verify the changes. Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
5027ee2 to
aae9b8f
Compare
artembilan
left a comment
There was a problem hiding this comment.
I think that's enough for now.
Let's work out a common ground and come together on the same page first!
Thank you for doing this, Soby!
| [[x41-share-ack-mode]] | ||
| === Share Consumer Acknowledgment Modes | ||
|
|
||
| The boolean `setExplicitShareAcknowledgment(boolean)` property on `ContainerProperties` has been replaced by the `ShareAckMode` enum, which clearly names the three distinct acknowledgment modes: |
There was a problem hiding this comment.
That is too much info for whats-new.adoc.
Why change pattern and provide all of that here, but not in the target chapter?
There was a problem hiding this comment.
Ok, will move to the chapter.
| * @since 4.1 | ||
| * @see #setShareAckMode(ShareAckMode) | ||
| */ | ||
| public enum ShareAckMode { |
There was a problem hiding this comment.
Feels like a decision to reuse ContainerProperties for ShareConsumer was wrong direction.
Now we give end-user confusion with existing AckMode, which apparently is not used ShareKafkaMessageListenerContainer.
And there are probably many other conflicting options not used here or there.
Not saying that it should be revised now, but something to keep in mind if we'd like to still keep our API end-user friendly.
There was a problem hiding this comment.
Ya, we have time to refine it before RC1.
There was a problem hiding this comment.
The problem is that we need to distinguish two kinds of containers if we introduce a properties class just for share consumers.
There was a problem hiding this comment.
Artem - as we discussed yesterday, we will try to tackle this as a follow up item as this involves a lot of moving parts.
| * <li><strong>EXPLICIT</strong> (default): Container-managed. The container sends ACCEPT after | ||
| * successful processing and delegates error handling to the {@link ShareConsumerRecordRecoverer} | ||
| * (default: REJECT). Equivalent to disabling {@code auto.commit} on a regular consumer.</li> | ||
| * <li><strong>MANUAL</strong>: Listener-managed. The listener must acknowledge each record via |
There was a problem hiding this comment.
I don't say that there is something wrong with the logic described here, but I wonder if that has crossed your mind that technically there is no difference between EXPLICIT and MANUAL.
Let's imaging the environment if we change the logic only for two EXPLICIT and IMPLICIT!
The EXPLICIT is MANUAL at the same time.
So, we always provide ack callback to end-user listener in EXPLICIT mode.
And if that callback was not called from end-user code, the container does that, respectively, for both success and error cases.
What do I miss, please?
What is the real driver for the MANUAL if it still per record?
There was a problem hiding this comment.
Hey Artem, thanks for the thoughtful feedback on this particular issue. I hear your arguments there, but I'd like to make the case for keeping both modes. The distinction is actually the same one we already draw for regular consumers — AckMode.BATCH (container owns everything, listener is unaware) vs AckMode.MANUAL (listener owns it, must call acknowledge()). There's no auto-fallback in regular consumer MANUAL mode either, and I think that strictness is intentional and valuable.
For share consumers:
• EXPLICIT-- listener is completely unaware of acknowledgment. The acknowledgment parameter is null, no API in scope, no decision to make. Container handles success and error outcomes.
• MANUAL-- listener owns every outcome. Non-null acknowledgment, and if you don't call it, the next poll() throws — the Kafka client enforces the contract, not just the docs.
The auto-fallback model silently auto-accepts a record when a listener forgets to call acknowledge(). In the strict MANUAL model that's a fast-fail -- the blocked poll makes the bug immediately obvious. Collapsing the two for share consumers while keeping them separate for regular consumers would be an inconsistency in the framework, no?
I think keeping both gives users a clear, unambiguous contract that aligns with how Spring Kafka already thinks about acknowledgment.
| + "ShareAckMode.IMPLICIT on the container properties instead."); | ||
| } | ||
| } | ||
| catch (UnsupportedOperationException ex) { |
There was a problem hiding this comment.
Why do we need to catch it here?
If that is not implemented in the provided ShareConsumerFactory, then better to fail for that reason.
This adds extra unnecessary noise into our code and even make it worse: this catch-n-log hides the problem with invoked method.
There was a problem hiding this comment.
good idea. will fix.
- Simplify whats-new.adoc and move detailed migration guide to ref docs - Remove try-catch around `getConfigurationProperties()` - let it fail if not implemented Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
artembilan
left a comment
There was a problem hiding this comment.
Looks cool, but hard to digest.
Let's hope my review is not very destructive!
Thank you for doing this!
| assertThat(redeliveryLatch.await(5, TimeUnit.SECONDS)) | ||
| .as("Record should be archived (REJECT) and not redelivered") | ||
| .isFalse(); | ||
| assertThat(deliveryCount.get()).isEqualTo(1); |
There was a problem hiding this comment.
The last two conditions are suspicious: the redelivery may not happen not only fort the REJECT reason.
It can be accepted in the error handler or some other way.
Or... Record could be delivered to other consumer on the same share topic.
Therefore we are blocked here for 5 seconds without any clarity.
If there is no way to track rejection somehow on Kafka side, then I don't see any value in this test at all.
Feel like we are trying to test here Kafka client itself: nothing is under our container control.
| try { | ||
| assertThat(bothSeenLatch.await(15, TimeUnit.SECONDS)).isTrue(); | ||
| // No redelivery: if a third invocation happened the latch would have reached 0 | ||
| assertThat(redeliveryLatch.await(5, TimeUnit.SECONDS)) |
There was a problem hiding this comment.
I don't think using timing barriers to ensure that something has not happened is the right way to model test behavior.
The event might not happened for may reasons, or our barrier is not long enough to catch that event.
It looks again that you are trying to prove the Kafka client (or even broker) promises with unreliable tools.
I don't know Kafka internal details to give a good advice how to verify that record has been rejected because we had thrown error on it.
But again probably out of our project scope...
| * `ACCEPT` — Record processed successfully; mark as completed. | ||
| * `RELEASE` — Temporary failure; make the record available for redelivery to this or another consumer. | ||
| * `REJECT` — Permanent failure; archive the record and do not redeliver. | ||
| * `RENEW` — Extend the acquisition lock (non-terminal). |
There was a problem hiding this comment.
Not related to the change in this PR (which is great), but raises the question as natural evolution afterwards.
How to deal with RENEW if we chose EXPLICIT?
Is there a possibility that we have to extend the lock somehow from the container perspective?
There was a problem hiding this comment.
hmm - thats a very interesting scenario that I didn't think about. Before this PR, we still had this problem with the old non-"explict" mode where we were not handing an Acknowledgment object to the listener. There we were not allowing listeners to do any renew, which was a true gap and that gap is still present with the new EXPLICIT mode. I guess, we have couple of options - 1. expose a renew-only acknowledgment in explicit mode for listeners to use 2. or document strictly that if they have a renew use case, then they have to use manual mode. We can also auto-renew periodically from the container based on some property maybe. I think we can probably tackle this as a follow-up item maybe?
|
|
||
| // Validate listener type for explicit acknowledgment mode | ||
| if (containerProperties.isExplicitShareAcknowledgment()) { | ||
| if (containerProperties.getShareAckMode() == ContainerProperties.ShareAckMode.MANUAL) { |
There was a problem hiding this comment.
I think we need to use equals() and have it flipped instead of == operator which does not compare objects, but rather their memory pointers:
if (ContainerProperties.ShareAckMode.MANUAL.equals(containerProperties.getShareAckMode())) {
| // Apply explicit mode configuration to consumer | ||
| // Note: This should ideally be done during consumer creation in the factory | ||
| this.logger.info("Share consumer configured for explicit acknowledgment mode"); | ||
| if (shareAckMode == ContainerProperties.ShareAckMode.IMPLICIT) { |
| if (shareAckMode == ContainerProperties.ShareAckMode.IMPLICIT) { | ||
| ShareConsumerRecordRecoverer recoverer = | ||
| ShareKafkaMessageListenerContainer.this.getShareConsumerRecordRecoverer(); | ||
| if (recoverer != ShareConsumerRecordRecoverer.REJECTING) { |
There was a problem hiding this comment.
Here identity is good, but has to be flipped to avoid NPE reports.
| Object configured = ShareKafkaMessageListenerContainer.this.shareConsumerFactory | ||
| .getConfigurationProperties() | ||
| .get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG); | ||
| if (configured != null && "implicit".equalsIgnoreCase(configured.toString())) { |
There was a problem hiding this comment.
Why cannot we use ShareAcknowledgementMode instead of hard-coded string?
| this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer( | ||
| ShareKafkaMessageListenerContainer.this.getGroupId(), | ||
| consumerClientId, | ||
| Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit")); |
There was a problem hiding this comment.
DITTO.
See ConsumerConfig defaults:
.define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
Type.STRING,
ShareAcknowledgementMode.IMPLICIT.name(),
new ShareAcknowledgementMode.Validator(),
Importance.MEDIUM,
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_DOC)
| + "ShareAckMode.IMPLICIT on the container properties instead."); | ||
| } | ||
| this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer( | ||
| ShareKafkaMessageListenerContainer.this.getGroupId(), |
There was a problem hiding this comment.
I think we use too many ShareKafkaMessageListenerContainer.this.getGroupId() in this method.
Cannot it be extracted to a local variable?
- Use `equals()` with constant first for enum comparisons - Flip identity check for recoverer NPE protection - Use `ShareAcknowledgementMode` enum instead of hardcoded strings - Extract `getGroupId()` to local variable - Remove flaky integration tests that use timing barriers to verify negative conditions Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
|
Now that this is merged, @sobychacko , please, take a look if PR #4374 is not a duplication someway, and if this merged PR is fixing respective issue: #4369 Thanks |
|
Thanks, @artembilan. Will do. |
The 4.0 boolean
explicitShareAcknowledgmentonly controlled whether the listener received a non-null acknowledgment object. It had no effect on Kafka'sshare.acknowledgement.mode, soacknowledge()would throwIllegalStateExceptionwhen the factory used the Kafka default (implicit mode).ContainerProperties.ShareAckMode(EXPLICIT, MANUAL, IMPLICIT) maps each use case to a clear name, following the same pattern as regular containers disabling auto.commit. For EXPLICIT and MANUAL the container now enforcesshare.acknowledgement.mode=explicitvia consumer override properties, regardless of factory configuration.Other related changes:
RecordDeserializationExceptionhandler now guardsacknowledge()against IMPLICIT modecreateShareConsumervariant added toShareConsumerFactoryfor consumer-level overridesDeprecated methods keep the 4.0 API intact.
Reference docs and What's New include a migration guide. Test changes.