KAFKA-2390; OffsetOutOfRangeException should contain the Offset and Partition info.#118
KAFKA-2390; OffsetOutOfRangeException should contain the Offset and Partition info.#118lindong28 wants to merge 20 commits into
Conversation
|
kafka-trunk-git-pr #97 FAILURE |
|
@lindong28 It looks like the callback is discarded without completion on a rebalance. I think we should probably make sure it always gets invoked eventually. A couple other points:
|
|
@hachikuji Thanks for your quick review.
|
|
kafka-trunk-git-pr #98 SUCCESS |
There was a problem hiding this comment.
Shouldn't this be invoked only after we've checked that this consume request matches the expected offset? I think the current code will behave incorrectly given the following sequence of events:
- Fetch request sent
- Rebalance
- Seek
- Fetch response triggers this code
The callback is invoked, but the fetch request was for the previous generation and errors like OFFSET_OUT_OF_RANGE will not be correctly passed to the callback.
There was a problem hiding this comment.
@ewencp Thanks for the review! I will fix it in the patch.
|
I agree with @hachikuji that the callback should always be invoked. If it isn't, then I think there would need to be some very clear javadocs indicating when it is/isn't invoked. Any use of this callback also requires implementing a ConsumerRebalanceCallback if you hold on to any state while waiting for the callback. |
…ed offset matches the seeked offset
|
@ewencp @hachikuji. Thanks for the suggestion! I have updated the patch such that the ConumserSeekCallback is always invoked, including when SubscriptionState.clearAssignment() and SubscriptionState.clearPartition(). I am not sure why the use of this callback also requires implementing a ConsumerRebalanceCallback though. Let me double think on this. |
|
@lindong28 I only meant it required implementing ConsumerRebalanceCallback if it didn't get invoked in some cases. If the application did something like invoke the commit and wait for the callback, meanwhile holding on to resources, but the callback wasn't guaranteed to be invoked if a rebalance occurred in the meantime, then the only way to correctly clean up the resources associated with the callback would be to implement a ConsumerRebalanceCallback to clean them up. If we guarantee the callback is always invoked, then this is no longer necessary (and simplifies application code that uses this callback). |
|
@lindong28 @ewencp I think the thing that's bugging me about the callback is that the semantics seem strange. When the consumer invokes seek(), the position is updated immediately and position() will return the seeked position. It's only later that the user finds out if that position is out of range. Intuitively, I would expect the callback to be invoked when the position is updated. I wonder if instead of using the generic ConsumerSeekCallback, the behavior would be clearer with an OffsetOutOfRangeCallback, which is only invoked when that error occurs? Then we wouldn't have to worry about ensuring the callback is invoked in all cases and it would make sense for the position to be updated immediately. Does that make sense? |
|
kafka-trunk-git-pr #123 SUCCESS |
|
@hachikuji Definitely understand that concern, I was confused at first that a callback even made sense since this seemed like an immediate update regardless of the mode the consumer was in. But would this OffsetOutOfRangeCallback be similar to the ConsumerRebalanceCallback where you need to provided it when you subscribe? It seems like it has the same somewhat weird callback semantics where it is provided once and can be invoked many times. But now that I'm thinking about more of these types of events, is it possible we just have the name wrong? Maybe we should be implementing a In any case, for this specifically, it is a bit odd that the callback would be disconnected from the request, but on the other hand, shouldn't this type of event only ever occur during a seek? The only other time it happens is during initial subscription, right? |
|
@ewencp Usually when we get the offset out of range error, we just reset the position, so i wouldn't see that callback as one which could be invoked multiple times. The callback just provides a way to override the default reset behavior. It's also intuitive in my opinion to be able to handle the error at the same location in the code that the seek is performed. I agree, however, that this is still not great. The user might want to intercept all out of range errors, not just the ones that result from a specific seek. The |
|
@ewencp @hachikuji I think we are discussing to things: 1) whether it is more intuitive to call it ConsumerSeekCallback or OffsetOutOfRangeCallback; 2) whether we should turn it and other callback into listener instead. I agree with your opinion on 2). But 2) goes beyond the scope of this ticket and requires much detailed discussion since it changes the user interface (e.g. ConsumerRebalanceListener). Can we focus on 1) first? I would prefer ConsumerSeekCallback to OffsetOutOfRangeCallback. If the user expects ConsumerSeekCallback to be invoked when position is updated, shouldn't he has the same expectation for OffsetOutOfRangeCallback? By keeping ConsumerSeekCallback, we have a more general callback API, and we can ensure that it is always invoked, including when rebalance happens or when user re-subscribe to partitions. We can describe in the document that seek() is actually an asynchronous API and it is only effective when the fetch after the seek gets a non-error response. |
|
@lindong28 That's a fair point about OffsetOutOfRangeCallback. It also has confusing semantics. I think what @ewencp was suggesting with the |
|
@hachikuji I agree that a ConsumerListener which provides a general hook for out of range errors is useful. What I try to understand is, whether there is a valid user case to have a callback for a specific seek. I just discussed with @becketqin and tend to think it can still be useful to provide callback in seek(). An out-of-range exception can happen for one of the three reasons: 1) invalid offset in seek, 2) unclean leader election, and 3) consumer lags behind too much. One reasonable strategy that I envision is that, consumer will throw exception and quit for 1), reset offset to latest for 2), and reset offset to earliest for 3). Such a strategy can be implemented using a combination of ConsumerSeekCallback and ConsumerListener, but not with ConsumerListener alone. Does this usecase make sense? |
|
@lindong28 Fair enough. I do see some value in being able to handle an out-of-range error in the scope of a specific seek. I still think the behavior around when the position gets updated is a little weird, but perhaps documenting it clearly will be enough. We have to explain how it is synchronous in the sense that the position will be updated immediately and the next fetch will try to use it, but asynchronous in the sense that we can only find out if the position is valid after we try to fetch. It makes it a little tough to say when the seek is "complete." Maybe instead of using It would be cool if there was an API on the broker to get the current range of a partition. Then we could have this method block while we check that the offset is in range. You could do it with the list offset request, but it seems like it would require two requests, which is annoying. |
|
@hachikuji Of course, thanks for your suggestion. I will clarify it in document and use onValidated. |
…er explain its usage
|
@hachikuji Sorry for the delay. I just updated patch to rename the callback to onResponse and updated API documentation to illustrate the reason that the callback is asynchronous. Does that alleviate the concern you have? I thought onResonse is a little better than onValidated because it hints the time when this callback is called. And the name is more general rather than being tied to a specific functionality, e.g. validation. What do you think? Thank you! |
|
kafka-trunk-git-pr #151 SUCCESS |
|
@lindong28 Yeah, I didn't like onValidated that much either. onResponse seems a little better. How about onFetchResponse to be even more explicit? Or maybe onFirstFetch to emphasize that the callback only applies to the first fetch? It's a little tough to tell how intuitive any of these will be for users. |
|
@hachikuji Sure. These names are better. I combined your suggestions and named it as onFirstFetchReponse. What do you think? |
|
kafka-trunk-git-pr #161 SUCCESS |
|
kafka-trunk-git-pr #407 FAILURE |
|
"./gradlew testAll" can successfully finish on my desktop. |
|
kafka-trunk-git-pr #481 SUCCESS |
|
Hi @guozhangwang, this patch has been reviewed by @hachikuji and @ijuma. This patch has been pending for a long time. Since you are very familiar with new consumer code, can you have a look and see if it can be committed? I am not sure if we are going to deliver new consumer in 0.9.0 release. If we do, then we probably need to include in 0.9.0 all patches that affect new consumer API, such as this patch and others under KAFKA-2387. |
There was a problem hiding this comment.
Could you add some comments here for the checking of the consumed position?
|
@lindong28 LGTM overall, just one minor comment. |
|
@guozhangwang Thanks! I have added comment. Please see updated patch. |
|
kafka-trunk-git-pr #510 FAILURE |
|
@lindong28 @guozhangwang We might need a followup on this patch. I think this line in throwIfOffsetOutOfRange doesn't work: Long consumed = subscriptions.consumed(entry.getKey());
// ignore partition if its consumed offset != offset in fetchResponse, e.g. after seek()
if (consumed != null && entry.getValue() == consumed)
currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());I think we need to use equals instead of '==': Long consumed = subscriptions.consumed(entry.getKey());
// ignore partition if its consumed offset != offset in fetchResponse, e.g. after seek()
if (consumed != null && consumed.equals(entry.getValue()))
currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());We should probably have a unit test for this case. |
|
@hachikuji Thanks for the catch! I have created a pull request to add unit test and replace == with equals. Interestingly, I find that here == has the same effect as equals -- they can both update currentOutOfRangePartitions and allow unit test to pass. I don't have explanation though. |
|
@lindong28 |
|
@lindong28 This puzzled me for a little while as well, but it looks like the test case actually ends up using the same Long instance for both the fetch position (which is stored for out of range errors) and the consumed position when you use seek(), hence reference equality works fine. This won't generally be the case since the fetch position usually is updated independent of the consumed position. |
|
@ijuma Interesting. I am curious motivation for having this rule. Do you have any reference to related documentation or online discussion? @hachikuji This puzzles me as well. Anyway, thanks much finding the problem. |
|
@lindong28, it's because there's a cache for commonly used values for Integer, Long, etc. The cache exists to avoid excessive allocation and it was added in Java 5 before HotSpot supported escape analysis and scalar replacement. A reference that I quickly found via Google: |
|
@ijuma Ah. So it called autoboxing. Thanks for this information:) |
… number of replicas. (apache#118)
…ON error (apache#118) [LI-HOTFIX] Add broker info log when getting error during fetch metadata call (apache#118) TICKET = KAFKA-12300 LI_DESCRIPTION = LIKAFKA-33540 This will backport pr (apache#112) to log which broker returned metadata request and help us debug a Venice issue that potentially stemmed from metadata propagation delay EXIT_CRITERIA = Upstream pr created and merged, we can close this after pulling in upstream pr
apache#118) CDPD-73907 fix ConnectSecretValidationFilter fails to parse the connector creation request after the 3.7 rebase Co-authored-by: Bertalan Kondrat <kb.pcre@gmail.com>
Signed-off-by: Greg Harris <greg.harris@aiven.io>
…rry-pick system tests: cherry-pick changes from trunk
No description provided.