GH-4321: Add error handling to share consumer container#4338
Conversation
Fixes: spring-projects#4321 - A single record that failed deserialization or had a corrupt batch was stopping the whole consumer thread. Catch `RecordDeserializationException` and `CorruptRecordException` from `poll()`, `REJECT` or log and continue, so the thread keeps running and later records are still processed. - Every listener exception was always REJECTing the record, with no way to RELEASE for transient failures. Introduce `ShareConsumerRecordRecoverer` (default: `DefaultShareConsumerRecordRecoverer`, `REJECT`) so callers can plug in `ACCEPT/RELEASE/REJECT` per failure; wire it on `AbstractShareKafkaMessageListenerContainer` and `ShareKafkaListenerContainerFactory`. - In explicit mode, failed records were only removed from pending acks and not from `acknowledgmentTimestamps`, which could leak. Clear both on error. Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
artembilan
left a comment
There was a problem hiding this comment.
Looks cool!
Nothing critical, just some cosmetic suggestions.
Thanks
| Share consumer containers now provide configurable error handling: | ||
|
|
||
| * **Poll-level**: `RecordDeserializationException` and `CorruptRecordException` from `poll()` are caught so the consumer thread continues; undeserializable records are REJECTed and the next poll proceeds. | ||
| * **Listener-level**: A `ShareConsumerRecordRecoverer` interface decides whether to ACCEPT, RELEASE, or REJECT when the listener throws. The default implementation REJECTs all failures; you can set a custom recoverer on the factory or container to RELEASE transient errors for redelivery. |
| * | ||
| * @author Soby Chacko | ||
| * @since 4.1 | ||
| * @see DefaultShareConsumerRecordRecoverer |
There was a problem hiding this comment.
Why all of these are stuck together?
Cannot it be made more readable?
Like each block over blank line?
| public class DefaultShareConsumerRecordRecoverer implements ShareConsumerRecordRecoverer { | ||
|
|
||
| private static final LogAccessor logger = new LogAccessor( | ||
| LogFactory.getLog(DefaultShareConsumerRecordRecoverer.class)); |
There was a problem hiding this comment.
We don't need LogFactory to use here.
See LogAccessor(Class<?>) ctor.
Do you foresee any evolution of this class (DefaultShareConsumerRecordRecoverer) at all?
I wonder if it could just make it into a property in the ShareConsumerRecordRecoverer interface.
Something like:
@FunctionalInterface
public interface ShareConsumerRecordRecoverer {
private LogAccessor logger = new LogAccessor(ShareConsumerRecordRecoverer .class);
ShareConsumerRecordRecoverer REJECTING = (record, ex) -> {
logger.error(exception, () -> "Share consumer record processing failed; rejecting record from "
+ record.topic() + "-" + record.partition() + "@" + record.offset());
return AcknowledgeType.REJECT;
}
And with that you may consider to add a RELEASING one, too...
| * @see ShareConsumerRecordRecoverer | ||
| */ | ||
| public void setShareConsumerRecordRecoverer(ShareConsumerRecordRecoverer recoverer) { | ||
| Assert.notNull(recoverer, "'recoverer' must not be null"); |
There was a problem hiding this comment.
I think we can just stop from now checking for null on non-null arguments.
And even don't mention that in Javadocs.
It becomes obvious from signature and from Nullability tools.
| * @return the recoverer | ||
| * @since 4.1 | ||
| */ | ||
| public ShareConsumerRecordRecoverer getShareConsumerRecordRecoverer() { |
There was a problem hiding this comment.
Why do we need public getter?
The more methods are visible from the class for me in IDE while typing, the more I get frustrated.
| private int concurrency = 1; | ||
|
|
||
| @Nullable | ||
| private ShareConsumerRecordRecoverer recordRecoverer; |
There was a problem hiding this comment.
Put annotation next to type.
Not saying this doesn't work, but for visual reasoning behind the annotation.
| instance.setApplicationContext(this.applicationContext); | ||
| instance.setApplicationEventPublisher(this.applicationEventPublisher); | ||
|
|
||
| if (this.recordRecoverer != null) { |
There was a problem hiding this comment.
Why does this go to the separate if, but not as a part of the JavaUtils.INSTANCE chain below?
| containerProps.setMessageListener((MessageListener<String, String>) record -> { | ||
| if (goodRecordValue.equals(record.value())) { | ||
| if (goodRecordsReceived.incrementAndGet() == 2) { | ||
| twoGoodRecordsLatch.countDown(); |
There was a problem hiding this comment.
Looks like you don't verify anything else afterwards, so we probably don't need to counters.
You just can make CountDownLatch twoGoodRecordsLatch = new CountDownLatch(2);.
Wait exactly for two counts. Kinda the same what you do with the increment, but more straightforward and without extra code.
| new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProperties); | ||
|
|
||
| ShareConsumerRecordRecoverer recoverer = container.getShareConsumerRecordRecoverer(); | ||
| assertThat(recoverer).isNotNull(); |
There was a problem hiding this comment.
Redundant. The isInstanceOf() covers us.
…view - Wire record recoverer in the `JavaUtils` chain with other optional config for consistent style. - Drop null check from the setter; rely on signature and nullability. - Make recoverer getter protected so subclasses in other packages can use it without expanding the public API. - Put default behavior on the interface as `REJECTING` and `RELEASING` so one extension point, no extra class, and a built-in for log-and-RELEASE; use `LogAccessor(Class)` instead of `LogFactory`. - Use a plain string for the `CorruptRecordException` log; the message is fixed so a supplier adds nothing. - Test changes. Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
artembilan
left a comment
There was a problem hiding this comment.
Just couple nit-picks.
Thanks
| * **Poll-level**: `RecordDeserializationException` and `CorruptRecordException` from `poll()` are caught so the consumer thread continues; undeserializable records are REJECTed and the next poll proceeds. | ||
| * **Listener-level**: A `ShareConsumerRecordRecoverer` interface decides whether to ACCEPT, RELEASE, or REJECT when the listener throws. The default implementation REJECTs all failures; you can set a custom recoverer on the factory or container to RELEASE transient errors for redelivery. | ||
| * **Listener-level**: A `ShareConsumerRecordRecoverer` interface decides whether to ACCEPT, RELEASE, or REJECT when the listener throws. | ||
| The default is `ShareConsumerRecordRecoverer.REJECTING`; `RELEASING` is also available. You can set a custom recoverer on the factory or container. |
There was a problem hiding this comment.
Still: one sentence per line.
| /** | ||
| * Logger used by built-in implementations. | ||
| */ | ||
| LogAccessor LOGGER = new LogAccessor(ShareConsumerRecordRecoverer.class); |
There was a problem hiding this comment.
its an interface and i don't think we can have private instance fields in interfaces. All fields declared in an interface implicitly become public, static and final. No?
There was a problem hiding this comment.
Right. I see now.
So, if you are OK having this to be available for target implementations of this interface, then we can live it as is.
Otherwise we would need to push this property down to every lambda.
WDYT?
There was a problem hiding this comment.
Keeping it as is. I think its ok to have a logger living on the interface; hiding it would mean adding extra components for little benefit.
Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
Fixes: #4321
A single record that failed deserialization or had a corrupt batch was stopping the whole consumer thread. Catch
RecordDeserializationExceptionandCorruptRecordExceptionfrompoll(),REJECTor log and continue, so the thread keeps running and later records are still processed.Every listener exception was always REJECTing the record, with no way to RELEASE for transient failures. Introduce
ShareConsumerRecordRecoverer(default:DefaultShareConsumerRecordRecoverer,REJECT) so callers can plug inACCEPT/RELEASE/REJECTper failure; wire it onAbstractShareKafkaMessageListenerContainerandShareKafkaListenerContainerFactory.In explicit mode, failed records were only removed from pending acks and not from
acknowledgmentTimestamps, which could leak. Clear both on error.