Skip to content

GH-4321: Add error handling to share consumer container#4338

Merged
artembilan merged 4 commits into
spring-projects:mainfrom
sobychacko:gh-4321
Mar 11, 2026
Merged

GH-4321: Add error handling to share consumer container#4338
artembilan merged 4 commits into
spring-projects:mainfrom
sobychacko:gh-4321

Conversation

@sobychacko

Copy link
Copy Markdown
Contributor

Fixes: #4321

  • A single record that failed deserialization or had a corrupt batch was stopping the whole consumer thread. Catch RecordDeserializationException and CorruptRecordException from poll(), REJECT or log and continue, so the thread keeps running and later records are still processed.

  • Every listener exception was always REJECTing the record, with no way to RELEASE for transient failures. Introduce ShareConsumerRecordRecoverer (default: DefaultShareConsumerRecordRecoverer, REJECT) so callers can plug in ACCEPT/RELEASE/REJECT per failure; wire it on AbstractShareKafkaMessageListenerContainer and ShareKafkaListenerContainerFactory.

  • In explicit mode, failed records were only removed from pending acks and not from acknowledgmentTimestamps, which could leak. Clear both on error.

Fixes: spring-projects#4321

- A single record that failed deserialization or had a corrupt batch was
  stopping the whole consumer thread. Catch `RecordDeserializationException`
  and `CorruptRecordException` from `poll()`, `REJECT` or log and continue, so
  the thread keeps running and later records are still processed.

- Every listener exception was always REJECTing the record, with no way
  to RELEASE for transient failures. Introduce `ShareConsumerRecordRecoverer`
  (default: `DefaultShareConsumerRecordRecoverer`, `REJECT`) so callers can
  plug in `ACCEPT/RELEASE/REJECT` per failure; wire it on
  `AbstractShareKafkaMessageListenerContainer` and `ShareKafkaListenerContainerFactory`.

- In explicit mode, failed records were only removed from pending acks and
  not from `acknowledgmentTimestamps`, which could leak. Clear both on error.

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks cool!
Nothing critical, just some cosmetic suggestions.

Thanks

Share consumer containers now provide configurable error handling:

* **Poll-level**: `RecordDeserializationException` and `CorruptRecordException` from `poll()` are caught so the consumer thread continues; undeserializable records are REJECTed and the next poll proceeds.
* **Listener-level**: A `ShareConsumerRecordRecoverer` interface decides whether to ACCEPT, RELEASE, or REJECT when the listener throws. The default implementation REJECTs all failures; you can set a custom recoverer on the factory or container to RELEASE transient errors for redelivery.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One sentence per line.

*
* @author Soby Chacko
* @since 4.1
* @see DefaultShareConsumerRecordRecoverer

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why all of these are stuck together?
Cannot it be made more readable?
Like each block over blank line?

public class DefaultShareConsumerRecordRecoverer implements ShareConsumerRecordRecoverer {

private static final LogAccessor logger = new LogAccessor(
LogFactory.getLog(DefaultShareConsumerRecordRecoverer.class));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need LogFactory to use here.
See LogAccessor(Class<?>) ctor.

Do you foresee any evolution of this class (DefaultShareConsumerRecordRecoverer) at all?
I wonder if it could just make it into a property in the ShareConsumerRecordRecoverer interface.
Something like:

@FunctionalInterface
public interface ShareConsumerRecordRecoverer {

private LogAccessor logger = new LogAccessor(ShareConsumerRecordRecoverer .class);

ShareConsumerRecordRecoverer REJECTING = (record, ex) -> {
        logger.error(exception, () -> "Share consumer record processing failed; rejecting record from "
				+ record.topic() + "-" + record.partition() + "@" + record.offset());
        return AcknowledgeType.REJECT;
    }

And with that you may consider to add a RELEASING one, too...

* @see ShareConsumerRecordRecoverer
*/
public void setShareConsumerRecordRecoverer(ShareConsumerRecordRecoverer recoverer) {
Assert.notNull(recoverer, "'recoverer' must not be null");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just stop from now checking for null on non-null arguments.
And even don't mention that in Javadocs.
It becomes obvious from signature and from Nullability tools.

* @return the recoverer
* @since 4.1
*/
public ShareConsumerRecordRecoverer getShareConsumerRecordRecoverer() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need public getter?
The more methods are visible from the class for me in IDE while typing, the more I get frustrated.

private int concurrency = 1;

@Nullable
private ShareConsumerRecordRecoverer recordRecoverer;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put annotation next to type.
Not saying this doesn't work, but for visual reasoning behind the annotation.

instance.setApplicationContext(this.applicationContext);
instance.setApplicationEventPublisher(this.applicationEventPublisher);

if (this.recordRecoverer != null) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this go to the separate if, but not as a part of the JavaUtils.INSTANCE chain below?

containerProps.setMessageListener((MessageListener<String, String>) record -> {
if (goodRecordValue.equals(record.value())) {
if (goodRecordsReceived.incrementAndGet() == 2) {
twoGoodRecordsLatch.countDown();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you don't verify anything else afterwards, so we probably don't need to counters.
You just can make CountDownLatch twoGoodRecordsLatch = new CountDownLatch(2);.
Wait exactly for two counts. Kinda the same what you do with the increment, but more straightforward and without extra code.

new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties);

ShareConsumerRecordRecoverer recoverer = container.getShareConsumerRecordRecoverer();
assertThat(recoverer).isNotNull();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant. The isInstanceOf() covers us.

…view

- Wire record recoverer in the `JavaUtils` chain with other optional config for consistent style.
- Drop null check from the setter; rely on signature and nullability.
- Make recoverer getter protected so subclasses in other packages can use it without expanding the public API.
- Put default behavior on the interface as `REJECTING` and `RELEASING` so one extension point, no extra class,
  and a built-in for log-and-RELEASE; use `LogAccessor(Class)` instead of `LogFactory`.
- Use a plain string for the `CorruptRecordException` log; the message is fixed so a supplier adds nothing.
- Test changes.

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>

@artembilan artembilan left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just couple nit-picks.

Thanks

* **Poll-level**: `RecordDeserializationException` and `CorruptRecordException` from `poll()` are caught so the consumer thread continues; undeserializable records are REJECTed and the next poll proceeds.
* **Listener-level**: A `ShareConsumerRecordRecoverer` interface decides whether to ACCEPT, RELEASE, or REJECT when the listener throws. The default implementation REJECTs all failures; you can set a custom recoverer on the factory or container to RELEASE transient errors for redelivery.
* **Listener-level**: A `ShareConsumerRecordRecoverer` interface decides whether to ACCEPT, RELEASE, or REJECT when the listener throws.
The default is `ShareConsumerRecordRecoverer.REJECTING`; `RELEASING` is also available. You can set a custom recoverer on the factory or container.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still: one sentence per line.

/**
* Logger used by built-in implementations.
*/
LogAccessor LOGGER = new LogAccessor(ShareConsumerRecordRecoverer.class);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot this be private?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its an interface and i don't think we can have private instance fields in interfaces. All fields declared in an interface implicitly become public, static and final. No?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I see now.
So, if you are OK having this to be available for target implementations of this interface, then we can live it as is.
Otherwise we would need to push this property down to every lambda.
WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping it as is. I think its ok to have a logger living on the interface; hiding it would mean adding extra components for little benefit.

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
@artembilan artembilan merged commit 8ee8d0f into spring-projects:main Mar 11, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add error handling strategy (poll-level and listener-level) to Share consumer container

2 participants