KAFKA-2391 [WIP]: add timeout to KafkaConsumer blocking calls#283
KAFKA-2391 [WIP]: add timeout to KafkaConsumer blocking calls#283onurkaraman wants to merge 3 commits into
Conversation
There was a problem hiding this comment.
One question: what is our plan for cases where the metadata update fails? A few options that I can think of:
- Attempt it again until the timeout expires
- Throw an exception immediately
- Wait until the timeout expires and then throw an exception
If I understand correctly, we are doing 3 at the moment (please correct me if I am wrong). I think 1 or 2 would be better although perhaps more work. We can talk about implementation issues after we decide what we want to do though. cc @hachikuji
There was a problem hiding this comment.
The tough part about the consumer is that the handling is context-dependent. If the call to awaitMetadataUpdate happens in KafkaConsumer.poll(), we might want to ignore the timeout and just return an empty record set to the user. If it's a blocking call, we want to throw the exception. I wonder if it would be a good idea to let this variant return a boolean indicating whether the update happened so that the caller can more easily choose what to do?
There was a problem hiding this comment.
Thanks for the quick reply @ijuma. Yeah the point you raise is valid.
It looks like KafkaConsumer.listTopics (more specifically, its call to Fetcher.getAllTopics) in this RB is actually doing 1, but KafkaConsumer.partitionsFor (more specifically, its call to ConsumerNetworkClient.awaitMetadataUpdate) is doing 3. Maybe we should make them both act the same.
Also just a heads up, this PR is very much WIP. It only has listTopics and partitionsFor with timeouts so far. I put this PR up partially so that it shows the flaw in using request.timeout.ms as the timeout. From the jira ticket:
- if request.timeout.ms < session.timeout.ms, then a user may drop JoinGroupResponses which can arrive worst case after session.timeout.ms.
- if request.timeout.ms > session.timeout.ms, then a user calling one of these blocking calls will lose their session and trigger a rebalance.
There was a problem hiding this comment.
Yes, good points @hachikuji and @onurkaraman. By the way, I totally understand that this is a WIP. I just thought it would be worth discussing this sooner rather than later. :)
It does seem that a variant that lets the caller decide what to do would be good to have. Not sure if a boolean is good enough to represent the 3 possible states (or if the callers need to know about the 3 possible states).
|
@hachikuji @onurkaraman Could you close this PR since it is already incorporated? |
|
@guozhangwang Was this incorporated? I thought it had not been. |
|
@ijuma As of today the only three blocking calls are |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
Limit how many connections can be opened by a broker _or_ controller.
Limit how many connections can be opened by a broker _or_ controller.
Single bulk apply of the Language Engine's IntelliJ-style format profile across the kafka source tree. Pairs with the IntelliJ-real control branch `intellij-formatting` for side-by-side comparison. Engine state at apply time includes the following format fixes landed against the language-engine repo: Pre-PR #11: - PR apache#275 multi-line // comment indent group - PR apache#276 BLANK_LINES_AROUND_CLASS + sibling no-op repair - PR apache#277 string-concat chain anchor preservation - PR apache#279 forward style through apply routes - PR apache#281 Result-tab line-number alignment - PR apache#282 spaces after return / throw / yield / instanceof - PR apache#283 chain-dot postfix preservation - PR apache#284 BlankLines sibling minimum.* conversion - PR apache#285 chain-dot single-anchor (N=1) preservation - PR apache#286 string-concat chain partial-cascade fix - PR apache#287 method-decl param re-align for misaligned source - PR apache#288 annotation-in-array-init indent Post-original-PR #11 (new in this re-apply): - PR apache#289 partial-cascade post-rewrite-anchor (4 shape fixes: chain-dot postfix follow-on, MI/NewClass close-paren cascade, ternary continuation, lambda body continuation) Stats: 3101 files written, 0 failures, 2678 already-idempotent files skipped (no_change). Corpus byte delta vs trunk: -510 bytes (106275 insertions / 106785 deletions).
No description provided.