Skip to content

KAFKA-2390; OffsetOutOfRangeException should contain the Offset and Partition info.#118

Closed
lindong28 wants to merge 20 commits into
apache:trunkfrom
lindong28:KAFKA-2390
Closed

KAFKA-2390; OffsetOutOfRangeException should contain the Offset and Partition info.#118
lindong28 wants to merge 20 commits into
apache:trunkfrom
lindong28:KAFKA-2390

Conversation

@lindong28

Copy link
Copy Markdown
Member

No description provided.

@asfbot

asfbot commented Aug 6, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #97 FAILURE
Looks like there's a problem with this pull request

@hachikuji

Copy link
Copy Markdown
Contributor

@lindong28 It looks like the callback is discarded without completion on a rebalance. I think we should probably make sure it always gets invoked eventually. A couple other points:

  • Do you think it would be helpful to include the TopicPartition and the offset that was seeked to in the callback?
  • Should we provide a similar mechanism for seekToBeginning and seekToEnd? It could be a helpful way to find the new offset.

@lindong28

Copy link
Copy Markdown
Member Author

@hachikuji Thanks for your quick review.

  1. Currently the only use case we envision for this callback in seek() is when the position to seek is out of range. Since a rebalance will change the offset and invalidate the previous seek, I think we can invalidate the callback as well.

  2. Yes, on a double think I think it will be useful to include the TopicPartition and the offset that was seeked to in the callback. This will be useful upon OFFSET_OUT_OF_RANGE exception.

  3. Due to the use case of this callback, I personally don't think it is useful to include it in seekToBeginning and seekToEnd. I would also like to see what others think of this.

@asfbot

asfbot commented Aug 6, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #98 SUCCESS
This pull request looks good

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be invoked only after we've checked that this consume request matches the expected offset? I think the current code will behave incorrectly given the following sequence of events:

  1. Fetch request sent
  2. Rebalance
  3. Seek
  4. Fetch response triggers this code

The callback is invoked, but the fetch request was for the previous generation and errors like OFFSET_OUT_OF_RANGE will not be correctly passed to the callback.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp Thanks for the review! I will fix it in the patch.

@ewencp

ewencp commented Aug 10, 2015

Copy link
Copy Markdown
Contributor

I agree with @hachikuji that the callback should always be invoked. If it isn't, then I think there would need to be some very clear javadocs indicating when it is/isn't invoked. Any use of this callback also requires implementing a ConsumerRebalanceCallback if you hold on to any state while waiting for the callback.

@lindong28

Copy link
Copy Markdown
Member Author

@ewencp @hachikuji. Thanks for the suggestion! I have updated the patch such that the ConumserSeekCallback is always invoked, including when SubscriptionState.clearAssignment() and SubscriptionState.clearPartition(). I am not sure why the use of this callback also requires implementing a ConsumerRebalanceCallback though. Let me double think on this.

@ewencp

ewencp commented Aug 11, 2015

Copy link
Copy Markdown
Contributor

@lindong28 I only meant it required implementing ConsumerRebalanceCallback if it didn't get invoked in some cases. If the application did something like invoke the commit and wait for the callback, meanwhile holding on to resources, but the callback wasn't guaranteed to be invoked if a rebalance occurred in the meantime, then the only way to correctly clean up the resources associated with the callback would be to implement a ConsumerRebalanceCallback to clean them up.

If we guarantee the callback is always invoked, then this is no longer necessary (and simplifies application code that uses this callback).

@hachikuji

Copy link
Copy Markdown
Contributor

@lindong28 @ewencp I think the thing that's bugging me about the callback is that the semantics seem strange. When the consumer invokes seek(), the position is updated immediately and position() will return the seeked position. It's only later that the user finds out if that position is out of range. Intuitively, I would expect the callback to be invoked when the position is updated. I wonder if instead of using the generic ConsumerSeekCallback, the behavior would be clearer with an OffsetOutOfRangeCallback, which is only invoked when that error occurs? Then we wouldn't have to worry about ensuring the callback is invoked in all cases and it would make sense for the position to be updated immediately. Does that make sense?

@asfbot

asfbot commented Aug 11, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #123 SUCCESS
This pull request looks good

@ewencp

ewencp commented Aug 11, 2015

Copy link
Copy Markdown
Contributor

@hachikuji Definitely understand that concern, I was confused at first that a callback even made sense since this seemed like an immediate update regardless of the mode the consumer was in. But would this OffsetOutOfRangeCallback be similar to the ConsumerRebalanceCallback where you need to provided it when you subscribe? It seems like it has the same somewhat weird callback semantics where it is provided once and can be invoked many times.

But now that I'm thinking about more of these types of events, is it possible we just have the name wrong? Maybe we should be implementing a ConsumerListener instead of a variety of callbacks? I think that naming is more in line with how we would normally describe the functionality since each of these methods can be invoked multiple times (rather than being a callback associated with a single invocation of a method).

In any case, for this specifically, it is a bit odd that the callback would be disconnected from the request, but on the other hand, shouldn't this type of event only ever occur during a seek? The only other time it happens is during initial subscription, right?

@hachikuji

Copy link
Copy Markdown
Contributor

@ewencp Usually when we get the offset out of range error, we just reset the position, so i wouldn't see that callback as one which could be invoked multiple times. The callback just provides a way to override the default reset behavior. It's also intuitive in my opinion to be able to handle the error at the same location in the code that the seek is performed.

I agree, however, that this is still not great. The user might want to intercept all out of range errors, not just the ones that result from a specific seek. The ConsumerListener interface could be interesting (at the very least, I definitely would approve of changing ConsumerRebalanceCallback to ConsumerRebalanceListener). It might be worthwhile to spend a little time trying to flesh out a basic sketch of the interface for comparison.

@lindong28

Copy link
Copy Markdown
Member Author

@ewencp @hachikuji I think we are discussing to things: 1) whether it is more intuitive to call it ConsumerSeekCallback or OffsetOutOfRangeCallback; 2) whether we should turn it and other callback into listener instead. I agree with your opinion on 2). But 2) goes beyond the scope of this ticket and requires much detailed discussion since it changes the user interface (e.g. ConsumerRebalanceListener). Can we focus on 1) first?

I would prefer ConsumerSeekCallback to OffsetOutOfRangeCallback. If the user expects ConsumerSeekCallback to be invoked when position is updated, shouldn't he has the same expectation for OffsetOutOfRangeCallback? By keeping ConsumerSeekCallback, we have a more general callback API, and we can ensure that it is always invoked, including when rebalance happens or when user re-subscribe to partitions. We can describe in the document that seek() is actually an asynchronous API and it is only effective when the fetch after the seek gets a non-error response.

@hachikuji

Copy link
Copy Markdown
Contributor

@lindong28 That's a fair point about OffsetOutOfRangeCallback. It also has confusing semantics.

I think what @ewencp was suggesting with the ConsumerListener interface is basically another approach to the problem that the ConsumerSeekCallback is trying to address. We all agree that the user needs some way to be able to handle out of range errors. The weird thing with the seek callback is that it only allows the user to hook into out of range errors when they seek, but these errors are an ongoing concern during the lifetime of the consumer. You might seek, fetch successfully, and then get an out of range error on the next fetch. The seek callback doesn't help you in that case, so you have to resort to catching exceptions. The question is whether we would need ConsumerSeekCallback any more if we had a ConsumerListener which provided a general hook for out of range errors. The nice thing is that it bypasses the weirdness of when the position gets updated and when the seek callback gets invoked. What do you think?

@lindong28

Copy link
Copy Markdown
Member Author

@hachikuji I agree that a ConsumerListener which provides a general hook for out of range errors is useful. What I try to understand is, whether there is a valid user case to have a callback for a specific seek. I just discussed with @becketqin and tend to think it can still be useful to provide callback in seek().

An out-of-range exception can happen for one of the three reasons: 1) invalid offset in seek, 2) unclean leader election, and 3) consumer lags behind too much. One reasonable strategy that I envision is that, consumer will throw exception and quit for 1), reset offset to latest for 2), and reset offset to earliest for 3). Such a strategy can be implemented using a combination of ConsumerSeekCallback and ConsumerListener, but not with ConsumerListener alone. Does this usecase make sense?

@hachikuji

Copy link
Copy Markdown
Contributor

@lindong28 Fair enough. I do see some value in being able to handle an out-of-range error in the scope of a specific seek. I still think the behavior around when the position gets updated is a little weird, but perhaps documenting it clearly will be enough. We have to explain how it is synchronous in the sense that the position will be updated immediately and the next fetch will try to use it, but asynchronous in the sense that we can only find out if the position is valid after we try to fetch. It makes it a little tough to say when the seek is "complete." Maybe instead of using onComplete in the callback, we could rename it to something which reflects this difference? Maybe onValidated?

It would be cool if there was an API on the broker to get the current range of a partition. Then we could have this method block while we check that the offset is in range. You could do it with the list offset request, but it seems like it would require two requests, which is annoying.

@lindong28

Copy link
Copy Markdown
Member Author

@hachikuji Of course, thanks for your suggestion. I will clarify it in document and use onValidated.

@lindong28

Copy link
Copy Markdown
Member Author

@hachikuji Sorry for the delay. I just updated patch to rename the callback to onResponse and updated API documentation to illustrate the reason that the callback is asynchronous. Does that alleviate the concern you have?

I thought onResonse is a little better than onValidated because it hints the time when this callback is called. And the name is more general rather than being tied to a specific functionality, e.g. validation. What do you think?

Thank you!

@asfbot

asfbot commented Aug 17, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #151 SUCCESS
This pull request looks good

@hachikuji

Copy link
Copy Markdown
Contributor

@lindong28 Yeah, I didn't like onValidated that much either. onResponse seems a little better. How about onFetchResponse to be even more explicit? Or maybe onFirstFetch to emphasize that the callback only applies to the first fetch? It's a little tough to tell how intuitive any of these will be for users.

@lindong28

Copy link
Copy Markdown
Member Author

@hachikuji Sure. These names are better. I combined your suggestions and named it as onFirstFetchReponse. What do you think?

@asfbot

asfbot commented Aug 18, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #161 SUCCESS
This pull request looks good

@asfbot

asfbot commented Sep 11, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #407 FAILURE
Looks like there's a problem with this pull request

@lindong28

Copy link
Copy Markdown
Member Author

"./gradlew testAll" can successfully finish on my desktop.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be final?

@asfbot

asfbot commented Sep 22, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #481 SUCCESS
This pull request looks good

@lindong28

Copy link
Copy Markdown
Member Author

Hi @guozhangwang, this patch has been reviewed by @hachikuji and @ijuma. This patch has been pending for a long time. Since you are very familiar with new consumer code, can you have a look and see if it can be committed?

I am not sure if we are going to deliver new consumer in 0.9.0 release. If we do, then we probably need to include in 0.9.0 all patches that affect new consumer API, such as this patch and others under KAFKA-2387.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments here for the checking of the consumed position?

@guozhangwang

Copy link
Copy Markdown
Contributor

@lindong28 LGTM overall, just one minor comment.

@lindong28

Copy link
Copy Markdown
Member Author

@guozhangwang Thanks! I have added comment. Please see updated patch.

@asfbot

asfbot commented Sep 23, 2015

Copy link
Copy Markdown

kafka-trunk-git-pr #510 FAILURE
Looks like there's a problem with this pull request

@asfgit asfgit closed this in b9ceac3 Sep 23, 2015
@lindong28 lindong28 deleted the KAFKA-2390 branch September 24, 2015 00:23
@hachikuji

Copy link
Copy Markdown
Contributor

@lindong28 @guozhangwang We might need a followup on this patch. I think this line in throwIfOffsetOutOfRange doesn't work:

Long consumed = subscriptions.consumed(entry.getKey());
// ignore partition if its consumed offset != offset in fetchResponse, e.g. after seek()
if (consumed != null && entry.getValue() == consumed)
    currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());

I think we need to use equals instead of '==':

Long consumed = subscriptions.consumed(entry.getKey());
// ignore partition if its consumed offset != offset in fetchResponse, e.g. after seek()
if (consumed != null && consumed.equals(entry.getValue()))
    currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());

We should probably have a unit test for this case.

@lindong28

Copy link
Copy Markdown
Member Author

@hachikuji Thanks for the catch! I have created a pull request to add unit test and replace == with equals.

Interestingly, I find that here == has the same effect as equals -- they can both update currentOutOfRangePartitions and allow unit test to pass. I don't have explanation though.

@ijuma

ijuma commented Sep 24, 2015

Copy link
Copy Markdown
Member

@lindong28 == is the same as equals when boxing values from -128 to 127 (or if you call Long.valueOf)

@hachikuji

Copy link
Copy Markdown
Contributor

@lindong28 This puzzled me for a little while as well, but it looks like the test case actually ends up using the same Long instance for both the fetch position (which is stored for out of range errors) and the consumed position when you use seek(), hence reference equality works fine. This won't generally be the case since the fetch position usually is updated independent of the consumed position.

@lindong28

Copy link
Copy Markdown
Member Author

@ijuma Interesting. I am curious motivation for having this rule. Do you have any reference to related documentation or online discussion?

@hachikuji This puzzles me as well. Anyway, thanks much finding the problem.

@ijuma

ijuma commented Sep 24, 2015

Copy link
Copy Markdown
Member

@lindong28, it's because there's a cache for commonly used values for Integer, Long, etc. The cache exists to avoid excessive allocation and it was added in Java 5 before HotSpot supported escape analysis and scalar replacement. A reference that I quickly found via Google:

https://dzone.com/articles/surprising-results-autoboxing

@lindong28

Copy link
Copy Markdown
Member Author

@ijuma Ah. So it called autoboxing. Thanks for this information:)

apurvam pushed a commit to apurvam/kafka that referenced this pull request Jan 24, 2017
efeg pushed a commit to efeg/kafka that referenced this pull request Jan 29, 2020
xiowu0 pushed a commit to xiowu0/kafka that referenced this pull request Apr 13, 2021
…ON error (apache#118)

[LI-HOTFIX] Add broker info log when getting error during fetch metadata call (apache#118)
TICKET = KAFKA-12300
LI_DESCRIPTION = LIKAFKA-33540
This will backport pr (apache#112) to log which broker returned metadata request and help us debug a Venice issue that potentially stemmed from metadata propagation delay 
EXIT_CRITERIA = Upstream pr created and merged, we can close this after pulling in upstream pr
k0b3rIT added a commit to k0b3rIT/kafka that referenced this pull request Mar 24, 2025
apache#118)

CDPD-73907 fix ConnectSecretValidationFilter fails to parse the connector creation request after the 3.7 rebase

Co-authored-by: Bertalan Kondrat <kb.pcre@gmail.com>
davide-armand pushed a commit to aiven/kafka that referenced this pull request Dec 1, 2025
Signed-off-by: Greg Harris <greg.harris@aiven.io>
fvaleri added a commit to fvaleri/kafka that referenced this pull request Apr 2, 2026
…rry-pick

system tests: cherry-pick changes from trunk
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants