GH-4324: Add async commit option to share consumer container#4379
GH-4324: Add async commit option to share consumer container#4379maxwell-balla wants to merge 4 commits into
Conversation
sobychacko
left a comment
There was a problem hiding this comment.
@maxwell-balla We need to add tests for this feature, preferably both unit and integration tests. Need to verify commitSync, commitAsync etc. are called as expected by using some kind of mocks. Also, need an end-to-end integration test verifying async commit. You can follow the existing patterns in ShareKafkaMessageListenerContainerUnitTests and ShareKafkaMessageListenerContainerIntegrationTests.
You need to add @author tag on all the classes that you are modifying.
Need to add ref docs, whats-new docs etc. But that can wait until after all the cod review concerns are addressed.
Thanks!
sobychacko
left a comment
There was a problem hiding this comment.
@maxwell-balla I added some more feedback. Please take a look. Also, make sure that you add the @author tag to all the classes you changed. You may want to start thinking about adding docs and whats-new changes. They need to go in with this PR.
…sumers after rebase After rebasing on top of the ShareAckMode enum refactor, the original syncShareCommits feature was lost due to conflicts. Reintroduce the syncShareCommits flag in ContainerProperties and ShareKafkaMessageListenerContainer to allow users to opt into commitAsync() for share consumer acknowledgments. Changes: - ContainerProperties: added syncShareCommits field, getter and setter - ShareKafkaMessageListenerContainer: read the flag in ShareListenerConsumer and switch between commitSync()/commitAsync() in commitAcknowledgments() - Added unit tests for default (sync) and custom (async) configuration - Fixed integration test that referenced the removed setter Fixes spring-projectsgh-4324 Signed-off-by: Maxwell Balla <ballamaxwell7@gmail.com>
…areCommits The previous tests only asserted getter/setter values without exercising the actual commit behavior. Unit tests now mock a ShareConsumer, process a record through the container, and verify that commitSync() or commitAsync() is actually invoked based on the syncShareCommits flag. Integration test now verifies async commit durability by consuming records, stopping the container, restarting with the same groupId, and asserting no redelivery occurs. Signed-off-by: Maxwell Balla <ballamaxwell7@gmail.com>
sobychacko
left a comment
There was a problem hiding this comment.
kafka-queues.adoc and whats-new.adoc need to be updated with the doc changes. Once you address my latest comments and the docs changes, we can move in the direction of a final review and then merge the feature. Thank you!
…feedback - Add syncShareCommits property to kafka-queues.adoc and whats-new.adoc - Include syncShareCommits in ContainerProperties.toString() - Add @author tag to ShareKafkaMessageListenerContainer - Replace Thread.sleep() with Awaitility.during() in async commit test Signed-off-by: Maxwell Balla <ballamaxwell7@gmail.com>
|
@maxwell-balla The PR has been merged upstream via 12286b1. Many thanks for your contribution! More PR's are welcomed!! |
|
Thanks @sobychacko ! Great experience contributing to the project. |
Add syncShareCommits flag (default true) to ContainerProperties to allow users to opt into async commits for share consumer acknowledgments.
Previously, ShareKafkaMessageListenerContainer called commitSync() unconditionally in commitAcknowledgments().
This change reads the new flag and switches between commitSync() and commitAsync() accordingly.
Changes:
Fixes gh-4324