Skip to content

Conversation

@harangozop
Copy link
Contributor

Fixes #24646

Motivation

When using Kafka Connect source connectors with transform filters that drop records (e.g., Debezium SMT Filter or Kafka Connect Filter + Predicate), the Pulsar Kafka Connect source can hang if a polled batch contains only records that are filtered out.
In this scenario, no records are emitted and the source waits indefinitely for an offset flush that never occurs.

This change ensures that batches are finalized even when all records are filtered: when the last record in a batch is processed and none were emitted/acked, we still trigger and complete the offset flush (treating “nothing to flush” as success). This prevents read() from blocking and allows the source to continue polling subsequent data.

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

(example:)

  • Added integration test that expects a flush when a filter is dropping every message in a given batch.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: harangozop#2

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Aug 21, 2025
@lhotari lhotari added this to the 4.1.0 milestone Aug 21, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, great work @harangozop! thanks for the contribution

@codecov-commenter
Copy link

codecov-commenter commented Aug 21, 2025

Codecov Report

❌ Patch coverage is 50.94340% with 26 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.16%. Comparing base (34f8657) to head (d9517c7).
⚠️ Report is 61 commits behind head on master.

Files with missing lines Patch % Lines
...r/io/kafka/connect/AbstractKafkaConnectSource.java 50.94% 20 Missing and 6 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24654      +/-   ##
============================================
- Coverage     74.32%   74.16%   -0.17%     
- Complexity    33180    33592     +412     
============================================
  Files          1882     1900      +18     
  Lines        146855   148357    +1502     
  Branches      16867    17202     +335     
============================================
+ Hits         109152   110028     +876     
- Misses        29038    29548     +510     
- Partials       8665     8781     +116     
Flag Coverage Δ
inttests 26.24% <ø> (-0.55%) ⬇️
systests 22.77% <ø> (-0.67%) ⬇️
unittests 73.68% <50.94%> (-0.13%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...r/io/kafka/connect/AbstractKafkaConnectSource.java 69.07% <50.94%> (+5.37%) ⬆️

... and 221 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the review comments

…thread safety

Added more test cases to cover:
- partially acked -> shouldn't flush
- all acked -> should flush
- no ack -> shouldn't flush
@coderzc coderzc modified the milestones: 4.1.0, 4.2.0 Sep 1, 2025
@peterh-wob
Copy link

hey @lhotari I've pushed the changes you suggested, mind giving it another look when you get a chance? Thanks!

@lhotari lhotari requested a review from Copilot September 5, 2025 06:56
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @harangozop! I added one review comment.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a hanging issue in Kafka Connect source connectors when all records in a polled batch are filtered out by transforms (e.g., Debezium SMT Filter). The fix ensures that offset flushing continues even when no records are emitted, preventing the source from blocking indefinitely.

  • Refactored offset flushing logic to trigger flushes for filtered-out records
  • Added comprehensive test coverage for various flush scenarios
  • Centralized flush handling with atomic operations to prevent race conditions

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
AbstractKafkaConnectSource.java Refactored offset flushing with centralized logic and atomic operations
KafkaConnectSourceTest.java Added comprehensive tests for flush scenarios including filtered records
pom.xml Added awaitility test dependency for async verification

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just an opportunity left to reduce duplication in test code

@harangozop
Copy link
Contributor Author

Thanks @lhotari I've added the helper method to the tests!

…r into a helper method for better readability
@harangozop harangozop force-pushed the bugfix/24646-kafka-connect-source-flush branch from 9d8facc to d9517c7 Compare September 11, 2025 17:40
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, 🎯 perfect work @harangozop

@lhotari lhotari merged commit a824f04 into apache:master Sep 11, 2025
51 checks passed
lhotari pushed a commit that referenced this pull request Sep 11, 2025
lhotari pushed a commit that referenced this pull request Sep 11, 2025
lhotari pushed a commit that referenced this pull request Sep 11, 2025
lhotari pushed a commit that referenced this pull request Sep 11, 2025
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 12, 2025
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 12, 2025
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 15, 2025
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 15, 2025
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 15, 2025
nodece pushed a commit to nodece/pulsar that referenced this pull request Sep 16, 2025
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Pulsar Kafka Connect adaptor: read() blocks indefinitely if all records in a batch are filtered

5 participants