GH-4328: Expose native Kafka Streams DLQ configuration#4360
Conversation
6c94761 to
a0db242
Compare
|
@loicgreffier Many thanks for the draft PR. I added some initial feedback that I wanted to convey. Please take a look and see what you think. Overall, the PR looks great. Some general things to keep in mind - we need to add a reference docs section for this (and some javadoc comments - see my inline comments). We also need a new entry in the |
8411197 to
6ec0036
Compare
|
@loicgreffier Some more minor nitpicks. Once you address those, feel free to remove the draft status on the PR. |
|
@sobychacko I've removed the draft status. Regarding the documentation:
If any sentence or wording isn't clear and need to be revised, please don't hesitate to let me know. EDIT: I've added a word about the headers that are being attached to the dead-letter record. |
e4ecab1 to
4e1dc01
Compare
|
@loicgreffier Thanks for all the updates. I added a final round of PR review comments. Hopefully, once that is addressed, we can take it for a final spin of review and merge. Btw - can you update the commit message with some more details about the rationale for the changes. We usually follow these guidelines for commit messages: https://cbea.ms/git-commit/ Thanks! |
|
Could you take care of the commit message? (See my comment above). Also, please sign your commits following DCO - https://spring.io/blog/2025/01/06/hello-dco-goodbye-cla-simplifying-contributions-to-spring |
62aa2f5 to
7f933c1
Compare
Extract the DLT record header-building logic into a new DeadLetterRecordManager class so it can be used outside of DeadLetterPublishingRecoverer. Add two new handlers, RecoveringProcessingExceptionHandler and RecoveringProductionExceptionHandler, to handle processing and production errors, respectively. Add a common AbstractRecoveringExceptionHandler that centralizes the shared error-handling logic for all provided recovering exception handlers. Add a KafkaStreamsDeadLetterDestinationResolver to allow users to define dead-letter routing logic used by the Kafka Streams native DLQ in the provided exception handler implementations. Update StreamsBuilderFactoryBean to expose a dead-letter queue topic name as a possible destination for all provided recovering exception handler implementations. Fixes: spring-projects#4328 Signed-off-by: Loïc Greffier <loic.greffier@michelin.com>
|
Commit message has been changed, accordingly to the original issue title |
|
@loicgreffier Thanks for the updates. I will merge the PR after some more testing and review. |
|
@loicgreffier Thank you once again for the PR contribution! It is now merged upstream to |
|
@sobychacko Many thanks, if any feedback post-merge, feel free to let me know |
The PR addresses the 4 points mentioned in issue #4328:
RecoveringDeserializationExceptionHandlerleverage the new Kafka Streams native DLQ introduced by KIP-1034.NativeDeadLetterDestinationResolveras an alternative to resolve the Kafka Streams native DLQ destination.RecoveringProcessingExceptionHandlerandRecoveringProductionExceptionHandler.To answer these points:
New DeadLetterRecordManager class. I've introduced a new
DeadLetterRecordManagerclass as a common place to build DLT headers, so the utilities for building headers can be leveraged by both theDeadLetterPublishingRecovererand exception handlers. I preferred this over a static utility class because of the number of parameters. One instance per handler and for theDeadLetterPublishingRecoverer.New
RecoveringProcessingExceptionHandlerandRecoveringProductionExceptionHandlerimplementations with the same recovery logic asRecoveringDeserializationExceptionHandler. I've introduced a newAbstractRecoveringExceptionHandlerthat holds the commons for all handlers.New NativeDeadLetterDestinationResolver interface. Allow users to define a native DLQ routing logic based on 3 given parameters: the
ErrorHandlerContext, the source record asConsumerRecordand the exception. All handlers leverage it. For now, it is loaded in the same way as theDeadLetterPublishingRecovererwas.I’ve updated the tests to cover the 3 recovering handler implementations. I introduced an
AbstractRecoveringExceptionHandlerTestsclass that contains commons tests. Child classes override some methods to create the corresponding handler and assert the related handling response.Concerns:
HeaderNamesclass is still located within theDeadLetterPublishingRecovererclass. It might be better to move it toDeadLetterRecordManager, but this would be a breaking change.Closes #4328