Skip to content

KAFKA-2391 [WIP]: add timeout to KafkaConsumer blocking calls#283

Closed
onurkaraman wants to merge 3 commits into
apache:trunkfrom
onurkaraman:KAFKA-2391
Closed

KAFKA-2391 [WIP]: add timeout to KafkaConsumer blocking calls#283
onurkaraman wants to merge 3 commits into
apache:trunkfrom
onurkaraman:KAFKA-2391

Conversation

@onurkaraman

Copy link
Copy Markdown
Contributor

No description provided.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question: what is our plan for cases where the metadata update fails? A few options that I can think of:

  1. Attempt it again until the timeout expires
  2. Throw an exception immediately
  3. Wait until the timeout expires and then throw an exception

If I understand correctly, we are doing 3 at the moment (please correct me if I am wrong). I think 1 or 2 would be better although perhaps more work. We can talk about implementation issues after we decide what we want to do though. cc @hachikuji

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tough part about the consumer is that the handling is context-dependent. If the call to awaitMetadataUpdate happens in KafkaConsumer.poll(), we might want to ignore the timeout and just return an empty record set to the user. If it's a blocking call, we want to throw the exception. I wonder if it would be a good idea to let this variant return a boolean indicating whether the update happened so that the caller can more easily choose what to do?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick reply @ijuma. Yeah the point you raise is valid.

It looks like KafkaConsumer.listTopics (more specifically, its call to Fetcher.getAllTopics) in this RB is actually doing 1, but KafkaConsumer.partitionsFor (more specifically, its call to ConsumerNetworkClient.awaitMetadataUpdate) is doing 3. Maybe we should make them both act the same.

Also just a heads up, this PR is very much WIP. It only has listTopics and partitionsFor with timeouts so far. I put this PR up partially so that it shows the flaw in using request.timeout.ms as the timeout. From the jira ticket:

  • if request.timeout.ms < session.timeout.ms, then a user may drop JoinGroupResponses which can arrive worst case after session.timeout.ms.
  • if request.timeout.ms > session.timeout.ms, then a user calling one of these blocking calls will lose their session and trigger a rebalance.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good points @hachikuji and @onurkaraman. By the way, I totally understand that this is a WIP. I just thought it would be worth discussing this sooner rather than later. :)

It does seem that a variant that lets the caller decide what to do would be good to have. Not sure if a boolean is good enough to represent the 3 possible states (or if the callers need to know about the 3 possible states).

@guozhangwang

Copy link
Copy Markdown
Contributor

@hachikuji @onurkaraman Could you close this PR since it is already incorporated?

@ijuma

ijuma commented Jan 29, 2016

Copy link
Copy Markdown
Member

@guozhangwang Was this incorporated? I thought it had not been.

@guozhangwang

Copy link
Copy Markdown
Contributor

@ijuma As of today the only three blocking calls are partitionsFor, listTopics and poll, where poll has a timeout parameter already, and the other two could throw TimeoutException with the configured timeout value.

@asfbot

asfbot commented Dec 21, 2016

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/330/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot

asfbot commented Dec 21, 2016

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/329/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot

asfbot commented Dec 21, 2016

Copy link
Copy Markdown

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/331/
Test FAILed (JDK 8 and Scala 2.11).

@pono pono closed this Dec 26, 2016
stanislavkozlovski pushed a commit to stanislavkozlovski/kafka that referenced this pull request Mar 20, 2020
* Revert "build.gradle: describe reason for disabling RAT check"

This reverts commit 3bda9e0.

* Revert "Disable RAT check as CCS now subject to issues in CPKAFKA-1831"

This reverts commit f0d2026.
davide-armand pushed a commit to aiven/kafka that referenced this pull request Dec 1, 2025
Limit how many connections can be opened by a broker _or_ controller.
jeqo added a commit to aiven/kafka that referenced this pull request Jan 16, 2026
Limit how many connections can be opened by a broker _or_ controller.
traceyyoshima added a commit to traceyyoshima/kafka that referenced this pull request May 22, 2026
Single bulk apply of the Language Engine's IntelliJ-style format
profile across the kafka source tree. Pairs with the IntelliJ-real
control branch `intellij-formatting` for side-by-side comparison.

Engine state at apply time includes the following format fixes
landed against the language-engine repo:

  Pre-PR #11:
  - PR apache#275 multi-line // comment indent group
  - PR apache#276 BLANK_LINES_AROUND_CLASS + sibling no-op repair
  - PR apache#277 string-concat chain anchor preservation
  - PR apache#279 forward style through apply routes
  - PR apache#281 Result-tab line-number alignment
  - PR apache#282 spaces after return / throw / yield / instanceof
  - PR apache#283 chain-dot postfix preservation
  - PR apache#284 BlankLines sibling minimum.* conversion
  - PR apache#285 chain-dot single-anchor (N=1) preservation
  - PR apache#286 string-concat chain partial-cascade fix
  - PR apache#287 method-decl param re-align for misaligned source
  - PR apache#288 annotation-in-array-init indent

  Post-original-PR #11 (new in this re-apply):
  - PR apache#289 partial-cascade post-rewrite-anchor (4 shape fixes:
    chain-dot postfix follow-on, MI/NewClass close-paren cascade,
    ternary continuation, lambda body continuation)

Stats: 3101 files written, 0 failures, 2678 already-idempotent
files skipped (no_change). Corpus byte delta vs trunk: -510 bytes
(106275 insertions / 106785 deletions).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants