Skip to content

Introduce ShareAckMode enum for share consumers#4376

Merged
artembilan merged 3 commits into
spring-projects:mainfrom
sobychacko:share-ack-mode
Apr 1, 2026
Merged

Introduce ShareAckMode enum for share consumers#4376
artembilan merged 3 commits into
spring-projects:mainfrom
sobychacko:share-ack-mode

Conversation

@sobychacko

@sobychacko sobychacko commented Mar 29, 2026

Copy link
Copy Markdown
Contributor

The 4.0 boolean explicitShareAcknowledgment only controlled whether the listener received a non-null acknowledgment object. It had no effect on Kafka's share.acknowledgement.mode, so acknowledge() would throw IllegalStateException when the factory used the Kafka default (implicit mode).

ContainerProperties.ShareAckMode (EXPLICIT, MANUAL, IMPLICIT) maps each use case to a clear name, following the same pattern as regular containers disabling auto.commit. For EXPLICIT and MANUAL the container now enforces share.acknowledgement.mode=explicit via consumer override properties, regardless of factory configuration.

Other related changes:

  • RecordDeserializationException handler now guards acknowledge() against IMPLICIT mode
  • createShareConsumer variant added to ShareConsumerFactory for consumer-level overrides
  • 4.0 dead code and naming issues cleaned up

Deprecated methods keep the 4.0 API intact.
Reference docs and What's New include a migration guide. Test changes.

@sobychacko sobychacko added this to the 4.1.0-RC1 milestone Mar 29, 2026
@sobychacko

Copy link
Copy Markdown
Contributor Author

The 4.0 explicitShareAcknowledgment boolean never touched Kafka's share.acknowledgement.mode. Kafka defaults that to implicit, which means consumer.acknowledge() throws IllegalStateException. The container called it unconditionally, so the feature only worked if the user had manually configured share.acknowledgement.mode=explicit in the factory.

ShareAckMode (EXPLICIT, MANUAL, IMPLICIT) replaces the boolean, following the same pattern as regular containers disabling auto.commit. For EXPLICIT and MANUAL, the container now enforces share.acknowledgement.mode=explicit via a new createShareConsumer override, so the correct Kafka protocol mode is always active regardless of factory config.

share.acknowledgement.mode=implicit in factory config was non-functional in 4.0. It's now detected, warned about, and overridden. Users who want true implicit mode should set ShareAckMode.IMPLICIT on ContainerProperties.

The 4.0 boolean `explicitShareAcknowledgment` only controlled whether
the listener received a non-null acknowledgment object. It had no
effect on Kafka's `share.acknowledgement.mode`, so `acknowledge()`
would throw `IllegalStateException` when the factory used the Kafka
default (implicit mode).

`ContainerProperties.ShareAckMode` (EXPLICIT, MANUAL, IMPLICIT) makes
the modes explicit, with EXPLICIT as the default — the same philosophy
as regular containers disabling auto.commit and owning the commit
lifecycle themselves. For EXPLICIT and MANUAL the container now
enforces `share.acknowledgement.mode=explicit` via consumer override
properties, regardless of factory configuration.

Other related fixes:

 - `RecordDeserializationException` handler now guards `acknowledge()`
   against IMPLICIT mode
 - `createShareConsumer` variant added to `ShareConsumerFactory` for
   consumer-level overrides
 - 4.0 dead code and naming issues cleaned up

Deprecated methods keep the 4.0 API intact.
Reference docs and What's New include a migration guide.
Adding tests to verify the changes.

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's enough for now.
Let's work out a common ground and come together on the same page first!

Thank you for doing this, Soby!

[[x41-share-ack-mode]]
=== Share Consumer Acknowledgment Modes

The boolean `setExplicitShareAcknowledgment(boolean)` property on `ContainerProperties` has been replaced by the `ShareAckMode` enum, which clearly names the three distinct acknowledgment modes:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is too much info for whats-new.adoc.
Why change pattern and provide all of that here, but not in the target chapter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will move to the chapter.

* @since 4.1
* @see #setShareAckMode(ShareAckMode)
*/
public enum ShareAckMode {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like a decision to reuse ContainerProperties for ShareConsumer was wrong direction.
Now we give end-user confusion with existing AckMode, which apparently is not used ShareKafkaMessageListenerContainer.
And there are probably many other conflicting options not used here or there.
Not saying that it should be revised now, but something to keep in mind if we'd like to still keep our API end-user friendly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, we have time to refine it before RC1.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that we need to distinguish two kinds of containers if we introduce a properties class just for share consumers.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Artem - as we discussed yesterday, we will try to tackle this as a follow up item as this involves a lot of moving parts.

* <li><strong>EXPLICIT</strong> (default): Container-managed. The container sends ACCEPT after
* successful processing and delegates error handling to the {@link ShareConsumerRecordRecoverer}
* (default: REJECT). Equivalent to disabling {@code auto.commit} on a regular consumer.</li>
* <li><strong>MANUAL</strong>: Listener-managed. The listener must acknowledge each record via

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't say that there is something wrong with the logic described here, but I wonder if that has crossed your mind that technically there is no difference between EXPLICIT and MANUAL.
Let's imaging the environment if we change the logic only for two EXPLICIT and IMPLICIT!
The EXPLICIT is MANUAL at the same time.
So, we always provide ack callback to end-user listener in EXPLICIT mode.
And if that callback was not called from end-user code, the container does that, respectively, for both success and error cases.

What do I miss, please?
What is the real driver for the MANUAL if it still per record?

@sobychacko sobychacko Mar 30, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Artem, thanks for the thoughtful feedback on this particular issue. I hear your arguments there, but I'd like to make the case for keeping both modes. The distinction is actually the same one we already draw for regular consumers — AckMode.BATCH (container owns everything, listener is unaware) vs AckMode.MANUAL (listener owns it, must call acknowledge()). There's no auto-fallback in regular consumer MANUAL mode either, and I think that strictness is intentional and valuable.

For share consumers:
• EXPLICIT-- listener is completely unaware of acknowledgment. The acknowledgment parameter is null, no API in scope, no decision to make. Container handles success and error outcomes.
• MANUAL-- listener owns every outcome. Non-null acknowledgment, and if you don't call it, the next poll() throws — the Kafka client enforces the contract, not just the docs.

The auto-fallback model silently auto-accepts a record when a listener forgets to call acknowledge(). In the strict MANUAL model that's a fast-fail -- the blocked poll makes the bug immediately obvious. Collapsing the two for share consumers while keeping them separate for regular consumers would be an inconsistency in the framework, no?

I think keeping both gives users a clear, unambiguous contract that aligns with how Spring Kafka already thinks about acknowledgment.

+ "ShareAckMode.IMPLICIT on the container properties instead.");
}
}
catch (UnsupportedOperationException ex) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to catch it here?
If that is not implemented in the provided ShareConsumerFactory, then better to fail for that reason.
This adds extra unnecessary noise into our code and even make it worse: this catch-n-log hides the problem with invoked method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. will fix.

- Simplify whats-new.adoc and move detailed migration guide to ref docs
- Remove try-catch around `getConfigurationProperties()` - let it fail if not implemented

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks cool, but hard to digest.

Let's hope my review is not very destructive!

Thank you for doing this!

assertThat(redeliveryLatch.await(5, TimeUnit.SECONDS))
.as("Record should be archived (REJECT) and not redelivered")
.isFalse();
assertThat(deliveryCount.get()).isEqualTo(1);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last two conditions are suspicious: the redelivery may not happen not only fort the REJECT reason.
It can be accepted in the error handler or some other way.
Or... Record could be delivered to other consumer on the same share topic.
Therefore we are blocked here for 5 seconds without any clarity.
If there is no way to track rejection somehow on Kafka side, then I don't see any value in this test at all.
Feel like we are trying to test here Kafka client itself: nothing is under our container control.

try {
assertThat(bothSeenLatch.await(15, TimeUnit.SECONDS)).isTrue();
// No redelivery: if a third invocation happened the latch would have reached 0
assertThat(redeliveryLatch.await(5, TimeUnit.SECONDS))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think using timing barriers to ensure that something has not happened is the right way to model test behavior.
The event might not happened for may reasons, or our barrier is not long enough to catch that event.

It looks again that you are trying to prove the Kafka client (or even broker) promises with unreliable tools.
I don't know Kafka internal details to give a good advice how to verify that record has been rejected because we had thrown error on it.
But again probably out of our project scope...

* `ACCEPT` — Record processed successfully; mark as completed.
* `RELEASE` — Temporary failure; make the record available for redelivery to this or another consumer.
* `REJECT` — Permanent failure; archive the record and do not redeliver.
* `RENEW` — Extend the acquisition lock (non-terminal).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to the change in this PR (which is great), but raises the question as natural evolution afterwards.
How to deal with RENEW if we chose EXPLICIT?
Is there a possibility that we have to extend the lock somehow from the container perspective?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm - thats a very interesting scenario that I didn't think about. Before this PR, we still had this problem with the old non-"explict" mode where we were not handing an Acknowledgment object to the listener. There we were not allowing listeners to do any renew, which was a true gap and that gap is still present with the new EXPLICIT mode. I guess, we have couple of options - 1. expose a renew-only acknowledgment in explicit mode for listeners to use 2. or document strictly that if they have a renew use case, then they have to use manual mode. We can also auto-renew periodically from the container based on some property maybe. I think we can probably tackle this as a follow-up item maybe?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue created to track: #4381


// Validate listener type for explicit acknowledgment mode
if (containerProperties.isExplicitShareAcknowledgment()) {
if (containerProperties.getShareAckMode() == ContainerProperties.ShareAckMode.MANUAL) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to use equals() and have it flipped instead of == operator which does not compare objects, but rather their memory pointers:

if (ContainerProperties.ShareAckMode.MANUAL.equals(containerProperties.getShareAckMode())) {

// Apply explicit mode configuration to consumer
// Note: This should ideally be done during consumer creation in the factory
this.logger.info("Share consumer configured for explicit acknowledgment mode");
if (shareAckMode == ContainerProperties.ShareAckMode.IMPLICIT) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DITTO

if (shareAckMode == ContainerProperties.ShareAckMode.IMPLICIT) {
ShareConsumerRecordRecoverer recoverer =
ShareKafkaMessageListenerContainer.this.getShareConsumerRecordRecoverer();
if (recoverer != ShareConsumerRecordRecoverer.REJECTING) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here identity is good, but has to be flipped to avoid NPE reports.

Object configured = ShareKafkaMessageListenerContainer.this.shareConsumerFactory
.getConfigurationProperties()
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
if (configured != null && "implicit".equalsIgnoreCase(configured.toString())) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why cannot we use ShareAcknowledgementMode instead of hard-coded string?

this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer(
ShareKafkaMessageListenerContainer.this.getGroupId(),
consumerClientId,
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DITTO.
See ConsumerConfig defaults:

                .define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
                                        Type.STRING,
                                        ShareAcknowledgementMode.IMPLICIT.name(),
                                        new ShareAcknowledgementMode.Validator(),
                                        Importance.MEDIUM,
                                        ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_DOC)

+ "ShareAckMode.IMPLICIT on the container properties instead.");
}
this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer(
ShareKafkaMessageListenerContainer.this.getGroupId(),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we use too many ShareKafkaMessageListenerContainer.this.getGroupId() in this method.
Cannot it be extracted to a local variable?

- Use `equals()` with constant first for enum comparisons
- Flip identity check for recoverer NPE protection
- Use `ShareAcknowledgementMode` enum instead of hardcoded strings
- Extract `getGroupId()` to local variable
- Remove flaky integration tests that use timing barriers to verify negative conditions

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
@artembilan artembilan merged commit 813b9af into spring-projects:main Apr 1, 2026
3 checks passed
@artembilan

Copy link
Copy Markdown
Member

Now that this is merged, @sobychacko , please, take a look if PR #4374 is not a duplication someway, and if this merged PR is fixing respective issue: #4369

Thanks

@sobychacko

Copy link
Copy Markdown
Contributor Author

Thanks, @artembilan. Will do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants