-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][io] Fix data loss issue in Kinesis source connector #24501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #24501 +/- ##
============================================
+ Coverage 73.57% 74.36% +0.78%
- Complexity 32624 32884 +260
============================================
Files 1877 1868 -9
Lines 139502 145925 +6423
Branches 15299 16732 +1433
============================================
+ Hits 102638 108511 +5873
+ Misses 28908 28818 -90
- Partials 7956 8596 +640
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR ensures Kinesis checkpointing is driven by Pulsar acknowledgments to uphold at-least-once delivery guarantees.
- Refactored
KinesisRecordProcessorto checkpoint onack(), track sequence numbers, and handle failures viaSourceContext.fatal(). - Updated
KinesisRecordto implementack()andfail(), passing the processor reference through constructors. - Propagated
SourceContextfromKinesisSourcethrough the factory into the processor and expanded unit tests for the new behavior.
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| KinesisRecordTest.java | Updated constructor calls to include the new recordProcessor parameter. |
| KinesisRecordProcessorTest.java | Added tests for ack-driven checkpointing and fail() handling. |
| KinesisSource.java | Passed SourceContext into KinesisRecordProcessorFactory. |
| KinesisRecordProcessorFactory.java | Extended factory to accept and forward SourceContext. |
| KinesisRecordProcessor.java | Overhauled checkpoint logic, introduced sequence tracking, and error handling with SourceContext. |
| KinesisRecord.java | Added sequenceNumber and recordProcessor, implemented ack()/fail(). |
Comments suppressed due to low confidence (2)
pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java:100
- [nitpick] The method is named
failed(), but theRecordinterface expectsfail(); renaming it tofail()would improve consistency.
public void failed() {
pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java:80
- Add an import for
software.amazon.kinesis.exceptions.ThrottlingExceptionto avoid a compilation error.
} catch (ThrottlingException | KinesisClientLibDependencyException e) {
pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessorTest.java
Show resolved
Hide resolved
pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisRecordTest.java
Outdated
Show resolved
Hide resolved
|
Could you rebase to master so the checkstyle for tests will be applied? |
(cherry picked from commit 0bdce23)
…ache#24501)" This reverts commit 0bdce23.
|
Thanks for the reminder, I think we can remove the label for 3.0 and 3.3 |
Motivation
The current Kinesis source connector violates the
at-least-oncedelivery guarantee, which can lead to data loss during failures or restarts. The root cause is that the Kinesis checkpointing is based on a time interval, completely decoupled from Pulsar's acknowledgment mechanism.This creates a race condition that leads to data loss, as illustrated below:
This PR fixes this critical issue by ensuring that checkpoints are only committed after a record has been successfully acknowledged by the Pulsar IO runtime.
Modifications
Refactored Checkpointing Logic: The checkpointing mechanism in KinesisRecordProcessor was changed from being time-based to being driven by Pulsar acks. It now checkpoints a specific sequenceNumber provided by an acknowledged record.
Implemented ack() and fail(): In KinesisRecord.java, the ack() method was implemented to call back to the KinesisRecordProcessor and update the latest sequence number that is safe to checkpoint. The fail() method now calls sourceContext.fatal() to ensure the connector restarts upon failure.
Passed SourceContext: The SourceContext is now passed down to the KinesisRecordProcessor to allow it to trigger a fatal connector restart upon unrecoverable errors (e.g., failed checkpoint attempts).
Verifying this change
Integration test
Result:
Before this PR: will receive fewer than 100 messages
After this PR: *Verify that the number of received messages is greater than or equal to 100, which fulfills the at-least-once delivery guarantee.
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: