-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][io] Kafka Source connector maybe stuck #22511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@shibd Please add the following content to your PR description and select a checkbox: |
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
Outdated
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Good work @shibd
Demogorgon314
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
nicoloboschi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
(cherry picked from commit bbff29d)
(cherry picked from commit bbff29d)
(cherry picked from commit bbff29d)
(cherry picked from commit bbff29d)
Motivation
The current implementation of Kafka source connector, that
KafkaRecorddoes not implement thefail()method.pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
Line 222 in bbae607
If
PulsarSinksend a message to pulsar failed, will callrecord.fail().pulsar/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
Lines 194 to 196 in 43a9898
But because KafkaRecord does not implement it, this
futureswill never end.pulsar/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
Line 178 in 495b141
This causes the Kafka consumer to leave the consumer group due to more than
max.poll.interval.ms.Modifications
fail()method forKakfaRecord.CompletableFuture.allOf(futures).get()support a timeout, set to 2/3 ofmax.poll.interval.ms. This adjustment is to ensure the interval between twopollrequests does not exceed Kafkamax.poll.interval.ms.Verifying this change
throwExceptionBySendFailandthrowExceptionBySendTimeOutto cover it.Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: