-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][io] Improve Kafka Connect source offset flushing logic #24654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][io] Improve Kafka Connect source offset flushing logic #24654
Conversation
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great work @harangozop! thanks for the contribution
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #24654 +/- ##
============================================
- Coverage 74.32% 74.16% -0.17%
- Complexity 33180 33592 +412
============================================
Files 1882 1900 +18
Lines 146855 148357 +1502
Branches 16867 17202 +335
============================================
+ Hits 109152 110028 +876
- Misses 29038 29548 +510
- Partials 8665 8781 +116
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
...ect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
Show resolved
Hide resolved
...ect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
Outdated
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the review comments
…thread safety Added more test cases to cover: - partially acked -> shouldn't flush - all acked -> should flush - no ack -> shouldn't flush
|
hey @lhotari I've pushed the changes you suggested, mind giving it another look when you get a chance? Thanks! |
...ect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @harangozop! I added one review comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a hanging issue in Kafka Connect source connectors when all records in a polled batch are filtered out by transforms (e.g., Debezium SMT Filter). The fix ensures that offset flushing continues even when no records are emitted, preventing the source from blocking indefinitely.
- Refactored offset flushing logic to trigger flushes for filtered-out records
- Added comprehensive test coverage for various flush scenarios
- Centralized flush handling with atomic operations to prevent race conditions
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| AbstractKafkaConnectSource.java | Refactored offset flushing with centralized logic and atomic operations |
| KafkaConnectSourceTest.java | Added comprehensive tests for flush scenarios including filtered records |
| pom.xml | Added awaitility test dependency for async verification |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
...ect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
Show resolved
Hide resolved
...connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
Outdated
Show resolved
Hide resolved
...connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
Outdated
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just an opportunity left to reduce duplication in test code
|
Thanks @lhotari I've added the helper method to the tests! |
…r into a helper method for better readability
9d8facc to
d9517c7
Compare
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, 🎯 perfect work @harangozop
(cherry picked from commit a824f04)
(cherry picked from commit a824f04)
(cherry picked from commit a824f04)
(cherry picked from commit a824f04)
Fixes #24646
Motivation
When using Kafka Connect source connectors with transform filters that drop records (e.g., Debezium SMT Filter or Kafka Connect Filter + Predicate), the Pulsar Kafka Connect source can hang if a polled batch contains only records that are filtered out.
In this scenario, no records are emitted and the source waits indefinitely for an offset flush that never occurs.
This change ensures that batches are finalized even when all records are filtered: when the last record in a batch is processed and none were emitted/acked, we still trigger and complete the offset flush (treating “nothing to flush” as success). This prevents read() from blocking and allows the source to continue polling subsequent data.
Modifications
Verifying this change
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: harangozop#2