KAFKA-8052; Ensure fetch session epoch is updated before new request#6582
Conversation
4c2116e to
8b3666b
Compare
|
Thanks, @rajinisivaram. Good find! If I understand correctly, the problem is that we could have invoked the callback for the fetch request, and be about to increment the epoch, when another prepareFetchRequests call happened, and ended up re-using the same epoch number. This is because the I don't think the logic for closing this race condition belongs in the FetchSessionHandler since it doesn't really have anything to do with fetch sessions. It would also be better to not use volatiles-- they are scary! :) I think we should just get rid of the Then I think the |
|
@rajinisivaram By the way, it seems this bug can also explain https://issues.apache.org/jira/browse/KAFKA-7565? |
|
Can this be merged into 2.2.x when resolved? Our logs are flooded with these messages. |
There was a problem hiding this comment.
Is warning level more appropriate than info?
There was a problem hiding this comment.
@noslowerdna Thanks for the review. I am not sure we want to change the log level from info to warn in all the cases in Fetcher. @cmccabe is more familiar with this code, so will be interested to see what he thinks.
There was a problem hiding this comment.
Is warning level more appropriate than info?
There was a problem hiding this comment.
Is warning level more appropriate than info?
There was a problem hiding this comment.
Is warning level more appropriate than info? Especially since we no longer expect the INVALID_FETCH_SESSION_EPOCH error to commonly occur.
There was a problem hiding this comment.
The class-level javadoc should thoroughly document the thread-safety of this class. And the sessionPartitions variable should be a ConcurrentHashMap rather than a LinkedHashMap (as KAFKA-7280 shows).
There was a problem hiding this comment.
Have included the changes in this PR under the thread safety section. To avoid any risky changes, only changes specific to this issue are included in this PR. For changes to other variables, it will be better to create a separate JIRA describing the issue.
|
Do we want to get this merged before 2.3.0? |
8b3666b to
0e05a90
Compare
|
@ijuma @cmccabe @noslowerdna I have rewritten this PR based on the suggestion from @cmccabe. It is a small change specifically to address the race condition in fetch request creation. Since several users run into this frequently, it may be worth including this in 2.3.0 even though it is actually not a blocker. |
|
retest this please |
|
@rajinisivaram To double check, https://issues.apache.org/jira/browse/KAFKA-7565 is not addressed by this change, correct? |
|
The rewrite looks good |
|
@ijuma Yes, that is correct, this does not fix the issue in https://issues.apache.org/jira/browse/KAFKA-7565. |
hachikuji
left a comment
There was a problem hiding this comment.
Thanks @rajinisivaram. Left one comment.
| Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>(); | ||
| for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) { | ||
| reqs.put(entry.getKey(), entry.getValue().build()); | ||
| this.nodesWithPendingFetchRequests.add(entry.getKey().id()); |
There was a problem hiding this comment.
I wonder if it would be better to make this call after client.send returns successfully in sendFetches?
Another safety we might add is to remove from nodesWithPendingFetchRequests above if client.isUnavailable returns true?
There was a problem hiding this comment.
@hachikuji Thanks for the review. Moved the call after client.send.
I wasn't sure about removing for client.isUnavailable. I think we guarantee that the listener onSuccess or onFailure is invoked in all cases if send suceeds. So that should be sufficient? I was a bit concerned that an additional remove could in theory mean that a listener invoked later from heartbeat thread could potentially remove the subsequent send. Dont think it can happen in practice though.
There was a problem hiding this comment.
Yeah, I think that should be fine. I just wasn't sure how much I wanted to trust the request completion handling logic. I guess it would break a lot of expectations elsewhere if it were broken though, so probably no additional harm from relying on it here.
|
Here is why I thought it might be the cause of KAFKA-7565. The bug as I understand it is that we are sending requests using stale session information. Basically the sequence is like this:
Is that right? I was thinking that prior to step 3), we might add a new partition to the fetch session. Then we might hit the KAFKA-7565 case when we eventually handled the response from 2). |
a1dbe3f to
58c05a4
Compare
58c05a4 to
f6690a7
Compare
|
@hachikuji Yes that sequence is the issue in KAFKA-7565. So this should prevent that happening. Thank you! |
| private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>(); | ||
| private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>(); | ||
| private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient; | ||
| private final Set<Integer> nodesWithPendingFetchRequests; |
There was a problem hiding this comment.
One idea that I had was to make this a Map<Integer, Long>, with the value being System.currentTimeMillis() at the time the fetch request is sent.
That would allow the "Skipping fetch for partition" log message to include the duration that the previous request has been pending for (possibly adjusting the log level based on how long ago that previous request was sent), and also enable a fetch request time metric to be easily collected if someone wishes to add that enhancement in the future.
There was a problem hiding this comment.
FYI, there is already a metric for fetch request latency.
hachikuji
left a comment
There was a problem hiding this comment.
Thanks @rajinisivaram . LGTM
| } | ||
|
|
||
| @Test | ||
| public void testFetcherSessionEpochUpdate() throws Exception { |
There was a problem hiding this comment.
I guess there was no choice but to carefully tailor this test in order to hit the bug. We have to do it, but the downside is that its scope is narrow and may be difficult to keep it relevant as the code evolves. Anyway, hopefully at some point we'll get the time to move all network IO to the background thread and then we can simplify a lot of this.
|
@hachikuji @cmccabe @noslowerdna @jsancio Thanks for the reviews. Merging to trunk and 2.3. |
…6582) Reviewers: Jason Gustafson <jason@confluent.io>, Colin Patrick McCabe <cmccabe@confluent.io>, Andrew Olson <aolson1@cerner.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
|
Would this ever be merged to a 2.2.x release? |
* apache-github/trunk: MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (apache#6775) MINOR: A few logging improvements in the broker (apache#6773) KAFKA-8052; Ensure fetch session epoch is updated before new request (apache#6582) KAFKA-8315: fix the JoinWindows retention deprecation doc (apache#6664) KAFKA-8265: Fix override config name to match KIP-458. (apache#6776) KAFKA-3143: Controller should transition offline replicas on startup MINOR: Work around OpenJDK 11 javadocs issue. (apache#6747) MINOR: Bump version to 2.4.0-SNAPSHOT (apache#6774)
|
Thanks, @rajinisivaram . |
If there is ever a 2.2.2 release, I don't see any reason why we couldn't merge this. |
…pache#6582) Reviewers: Jason Gustafson <jason@confluent.io>, Colin Patrick McCabe <cmccabe@confluent.io>, Andrew Olson <aolson1@cerner.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
…updated before new request (apache#6582) TICKET = KAFKA-8052 LI_DESCRIPTION = This will remove intermittent INVALID_FETCH_SESSION_EPOCH errors on fetch requests EXIT_CRITERIA = HASH [012880d] ORIGINAL_DESCRIPTION = Reviewers: Jason Gustafson <jason@confluent.io>, Colin Patrick McCabe <cmccabe@confluent.io>, Andrew Olson <aolson1@cerner.com>, José Armando García Sancio <jsancio@users.noreply.github.com (cherry picked from commit 012880d)
…updated before new request (apache#6582) (#31) TICKET = KAFKA-8052 LI_DESCRIPTION = This will remove intermittent INVALID_FETCH_SESSION_EPOCH errors on fetch requests EXIT_CRITERIA = HASH [012880d] ORIGINAL_DESCRIPTION = Reviewers: Jason Gustafson <jason@confluent.io>, Colin Patrick McCabe <cmccabe@confluent.io>, Andrew Olson <aolson1@cerner.com>, José Armando García Sancio <jsancio@users.noreply.github.com (cherry picked from commit 012880d)
When fetch response is processed by the heartbeat thread, polling thread may send new fetch request with the same epoch as the previous fetch request if heartbeat thread hasn't yet updated the epoch. This results in INVALID_FETCH_SESSION_EPOCH error. Even though the request is retried without any disconnections, it will be good to avoid this error. The PR tracks status of previous request in the session handler and sends next fetch request only after the response from the previous request is processed.
Committer Checklist (excluded from commit message)