Skip to content

Conversation

@shibd
Copy link
Member

@shibd shibd commented Jul 10, 2025

Motivation

The current Kinesis source connector violates the at-least-once delivery guarantee, which can lead to data loss during failures or restarts. The root cause is that the Kinesis checkpointing is based on a time interval, completely decoupled from Pulsar's acknowledgment mechanism.

This creates a race condition that leads to data loss, as illustrated below:

  • Time 1: KinesisRecordProcessor fetches record 0 from a Kinesis shard and puts it into the internal queue.
  • Time 2: The KinesisSource thread reads record 0 from the queue and attempts to send it to a Pulsar topic, but the send operation is delayed or failing.
  • Time 3: The KinesisRecordProcessor's timer triggers. It checkpoints its progress, marking that it has successfully processed up to record 0, even though this record has not yet been acknowledged by Pulsar.
  • Time 4: The connector crashes before record 0 can be successfully sent and acked. Upon restart, the KCL worker reads the last checkpoint and starts consuming from the position after record 0, causing record 0 to be permanently lost.

This PR fixes this critical issue by ensuring that checkpoints are only committed after a record has been successfully acknowledged by the Pulsar IO runtime.

Modifications

  1. Refactored Checkpointing Logic: The checkpointing mechanism in KinesisRecordProcessor was changed from being time-based to being driven by Pulsar acks. It now checkpoints a specific sequenceNumber provided by an acknowledged record.

  2. Implemented ack() and fail(): In KinesisRecord.java, the ack() method was implemented to call back to the KinesisRecordProcessor and update the latest sequence number that is safe to checkpoint. The fail() method now calls sourceContext.fatal() to ensure the connector restarts upon failure.

  3. Passed SourceContext: The SourceContext is now passed down to the KinesisRecordProcessor to allow it to trigger a fatal connector restart upon unrecoverable errors (e.g., failed checkpoint attempts).

Verifying this change

  • Added Unit Tests: Created KinesisRecordProcessorTest.java to verify the new ack-driven checkpointing logic.

Integration test

  1. Create a pulsar topic and a subscription.
  2. Create a Kinesis source connector.
  3. Send 100 messages to Kinesis with 1s internal.
    public static void main(String[] args) throws Exception {

        AWSCredentialsProvider credentialsProvider =
                new AWSStaticCredentialsProvider(new BasicAWSCredentials("", ""));

        KinesisProducerConfiguration kinesisConfig = new KinesisProducerConfiguration();
        kinesisConfig.setRegion("ap-northeast-1");
        kinesisConfig.setCredentialsProvider(credentialsProvider);
        KinesisProducer kinesis = new KinesisProducer(kinesisConfig);
        // Put some records 
        for (int i = 0; i < 100; ++i) {
            ByteBuffer data = ByteBuffer.wrap("test-kinesis-data".getBytes("UTF-8"));
            // doesn't block       
            kinesis.addUserRecord("connector-source", "myPartitionKey", data);
            kinesis.flush();
            Thread.sleep(1000);
        }
        kinesis.flush();
        Thread.sleep(10000);
        kinesis.destroy();
    }
  1. Topic unloads and connector restarts will occur at random intervals during this time.

Result:

Before this PR: will receive fewer than 100 messages

Receive message test-kinesis-data with count 96
Receive message test-kinesis-data with count 97
Received 97 messages

After this PR: *Verify that the number of received messages is greater than or equal to 100, which fulfills the at-least-once delivery guarantee.

Receive message test-kinesis-data with count 101
Receive message test-kinesis-data with count 102
Received 102 messages

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@shibd shibd self-assigned this Jul 10, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 10, 2025
@codecov-commenter
Copy link

codecov-commenter commented Jul 10, 2025

Codecov Report

Attention: Patch coverage is 56.41026% with 17 lines in your changes missing coverage. Please review.

Project coverage is 74.36%. Comparing base (bbc6224) to head (5a97596).
Report is 1189 commits behind head on master.

Files with missing lines Patch % Lines
...ache/pulsar/io/kinesis/KinesisRecordProcessor.java 53.57% 12 Missing and 1 partial ⚠️
...lsar/io/kinesis/KinesisRecordProcessorFactory.java 0.00% 3 Missing ⚠️
...va/org/apache/pulsar/io/kinesis/KinesisSource.java 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24501      +/-   ##
============================================
+ Coverage     73.57%   74.36%   +0.78%     
- Complexity    32624    32884     +260     
============================================
  Files          1877     1868       -9     
  Lines        139502   145925    +6423     
  Branches      15299    16732    +1433     
============================================
+ Hits         102638   108511    +5873     
+ Misses        28908    28818      -90     
- Partials       7956     8596     +640     
Flag Coverage Δ
inttests 26.63% <ø> (+2.05%) ⬆️
systests 23.33% <0.00%> (-0.99%) ⬇️
unittests 73.84% <56.41%> (+1.00%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/pulsar/io/kinesis/KinesisRecord.java 77.50% <100.00%> (+77.50%) ⬆️
...va/org/apache/pulsar/io/kinesis/KinesisSource.java 0.00% <0.00%> (ø)
...lsar/io/kinesis/KinesisRecordProcessorFactory.java 0.00% <0.00%> (ø)
...ache/pulsar/io/kinesis/KinesisRecordProcessor.java 44.77% <53.57%> (+44.77%) ⬆️

... and 1095 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@RobertIndie RobertIndie requested a review from Copilot July 10, 2025 15:06
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR ensures Kinesis checkpointing is driven by Pulsar acknowledgments to uphold at-least-once delivery guarantees.

  • Refactored KinesisRecordProcessor to checkpoint on ack(), track sequence numbers, and handle failures via SourceContext.fatal().
  • Updated KinesisRecord to implement ack() and fail(), passing the processor reference through constructors.
  • Propagated SourceContext from KinesisSource through the factory into the processor and expanded unit tests for the new behavior.

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
KinesisRecordTest.java Updated constructor calls to include the new recordProcessor parameter.
KinesisRecordProcessorTest.java Added tests for ack-driven checkpointing and fail() handling.
KinesisSource.java Passed SourceContext into KinesisRecordProcessorFactory.
KinesisRecordProcessorFactory.java Extended factory to accept and forward SourceContext.
KinesisRecordProcessor.java Overhauled checkpoint logic, introduced sequence tracking, and error handling with SourceContext.
KinesisRecord.java Added sequenceNumber and recordProcessor, implemented ack()/fail().
Comments suppressed due to low confidence (2)

pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java:100

  • [nitpick] The method is named failed(), but the Record interface expects fail(); renaming it to fail() would improve consistency.
    public void failed() {

pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisRecordProcessor.java:80

  • Add an import for software.amazon.kinesis.exceptions.ThrottlingException to avoid a compilation error.
            } catch (ThrottlingException | KinesisClientLibDependencyException e) {

@BewareMyPower
Copy link
Contributor

Could you rebase to master so the checkstyle for tests will be applied?

@shibd shibd merged commit 0bdce23 into apache:master Jul 11, 2025
51 checks passed
shibd added a commit that referenced this pull request Jul 11, 2025
3pacccccc added a commit to 3pacccccc/pulsar that referenced this pull request Jul 11, 2025
codelipenghui pushed a commit to codelipenghui/incubator-pulsar that referenced this pull request Jul 15, 2025
@lhotari
Copy link
Member

lhotari commented Jul 17, 2025

This change depends on #21143, @shibd could you please handle backporting to branch-3.0 or remove the release label for 3.0.x ?

@shibd
Copy link
Member Author

shibd commented Jul 17, 2025

Thanks for the reminder, I think we can remove the label for 3.0 and 3.3

priyanshu-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 22, 2025
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 24, 2025
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/connector cherry-picked/branch-4.0 doc-not-needed Your PR changes do not impact docs ready-to-test release/4.0.6 type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants