Skip to content

KAFKA-8052; Ensure fetch session epoch is updated before new request#6582

Merged
rajinisivaram merged 2 commits into
apache:trunkfrom
rajinisivaram:KAFKA-8052-fetch-epoch
May 21, 2019
Merged

KAFKA-8052; Ensure fetch session epoch is updated before new request#6582
rajinisivaram merged 2 commits into
apache:trunkfrom
rajinisivaram:KAFKA-8052-fetch-epoch

Conversation

@rajinisivaram

Copy link
Copy Markdown
Contributor

When fetch response is processed by the heartbeat thread, polling thread may send new fetch request with the same epoch as the previous fetch request if heartbeat thread hasn't yet updated the epoch. This results in INVALID_FETCH_SESSION_EPOCH error. Even though the request is retried without any disconnections, it will be good to avoid this error. The PR tracks status of previous request in the session handler and sends next fetch request only after the response from the previous request is processed.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@rajinisivaram rajinisivaram requested a review from hachikuji April 14, 2019 10:36
@rajinisivaram rajinisivaram force-pushed the KAFKA-8052-fetch-epoch branch from 4c2116e to 8b3666b Compare April 14, 2019 10:49
@cmccabe

cmccabe commented Apr 15, 2019

Copy link
Copy Markdown
Contributor

Thanks, @rajinisivaram. Good find!

If I understand correctly, the problem is that we could have invoked the callback for the fetch request, and be about to increment the epoch, when another prepareFetchRequests call happened, and ended up re-using the same epoch number. This is because the client.hasPendingRequests(node) condition will be false once the response is received, even if we haven't yet processed that response. (P.S. Thanks to @hachikuji for an explanation of how the heartbeat thread works)

I don't think the logic for closing this race condition belongs in the FetchSessionHandler since it doesn't really have anything to do with fetch sessions. It would also be better to not use volatiles-- they are scary! :)

I think we should just get rid of the client.hasPendingRequests(node) check, since it's not sufficient, and add a SynchronizedSet<Integer> pendingFetchRequests. Then we can just do something like

if (pendingFetchRequests.add(nodeId)) {
  // ... invoke session handler
} else {
  // log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);
}

Then I think the onSuccess and onFailure handers in Fetcher.java can do pendingFetchRequests.remove(nodeId), once they're finished with all processing. It would be better to do it at the very end of those functions, since otherwise we have to think really hard about the state of data structures like completedFetches, etc. Maybe a try/finally block could help here?

@hachikuji

Copy link
Copy Markdown
Contributor

@rajinisivaram By the way, it seems this bug can also explain https://issues.apache.org/jira/browse/KAFKA-7565?

Comment thread clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java Outdated
@manderson23

Copy link
Copy Markdown

Can this be merged into 2.2.x when resolved? Our logs are flooded with these messages.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is warning level more appropriate than info?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noslowerdna Thanks for the review. I am not sure we want to change the log level from info to warn in all the cases in Fetcher. @cmccabe is more familiar with this code, so will be interested to see what he thinks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is warning level more appropriate than info?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is warning level more appropriate than info?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is warning level more appropriate than info? Especially since we no longer expect the INVALID_FETCH_SESSION_EPOCH error to commonly occur.

Comment thread clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java Outdated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class-level javadoc should thoroughly document the thread-safety of this class. And the sessionPartitions variable should be a ConcurrentHashMap rather than a LinkedHashMap (as KAFKA-7280 shows).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have included the changes in this PR under the thread safety section. To avoid any risky changes, only changes specific to this issue are included in this PR. For changes to other variables, it will be better to create a separate JIRA describing the issue.

@ijuma

ijuma commented May 19, 2019

Copy link
Copy Markdown
Member

Do we want to get this merged before 2.3.0?

@rajinisivaram rajinisivaram force-pushed the KAFKA-8052-fetch-epoch branch from 8b3666b to 0e05a90 Compare May 20, 2019 10:44
@rajinisivaram

Copy link
Copy Markdown
Contributor Author

@ijuma @cmccabe @noslowerdna I have rewritten this PR based on the suggestion from @cmccabe. It is a small change specifically to address the race condition in fetch request creation. Since several users run into this frequently, it may be worth including this in 2.3.0 even though it is actually not a blocker.

@rajinisivaram

Copy link
Copy Markdown
Contributor Author

retest this please

@ijuma ijuma requested a review from cmccabe May 20, 2019 14:38
@ijuma

ijuma commented May 20, 2019

Copy link
Copy Markdown
Member

@rajinisivaram To double check, https://issues.apache.org/jira/browse/KAFKA-7565 is not addressed by this change, correct?

@noslowerdna

Copy link
Copy Markdown
Contributor

The rewrite looks good

@rajinisivaram

Copy link
Copy Markdown
Contributor Author

@ijuma Yes, that is correct, this does not fix the issue in https://issues.apache.org/jira/browse/KAFKA-7565.

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rajinisivaram. Left one comment.

Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>();
for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) {
reqs.put(entry.getKey(), entry.getValue().build());
this.nodesWithPendingFetchRequests.add(entry.getKey().id());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to make this call after client.send returns successfully in sendFetches?

Another safety we might add is to remove from nodesWithPendingFetchRequests above if client.isUnavailable returns true?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Thanks for the review. Moved the call after client.send.

I wasn't sure about removing for client.isUnavailable. I think we guarantee that the listener onSuccess or onFailure is invoked in all cases if send suceeds. So that should be sufficient? I was a bit concerned that an additional remove could in theory mean that a listener invoked later from heartbeat thread could potentially remove the subsequent send. Dont think it can happen in practice though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that should be fine. I just wasn't sure how much I wanted to trust the request completion handling logic. I guess it would break a lot of expectations elsewhere if it were broken though, so probably no additional harm from relying on it here.

@hachikuji

Copy link
Copy Markdown
Contributor

Here is why I thought it might be the cause of KAFKA-7565. The bug as I understand it is that we are sending requests using stale session information. Basically the sequence is like this:

  1. send request epoch=n
  2. receive response, but don't handle it
  3. send new request with epoch=n

Is that right? I was thinking that prior to step 3), we might add a new partition to the fetch session. Then we might hit the KAFKA-7565 case when we eventually handled the response from 2).

@rajinisivaram rajinisivaram force-pushed the KAFKA-8052-fetch-epoch branch from a1dbe3f to 58c05a4 Compare May 21, 2019 09:35
@rajinisivaram rajinisivaram force-pushed the KAFKA-8052-fetch-epoch branch from 58c05a4 to f6690a7 Compare May 21, 2019 09:50
@rajinisivaram

Copy link
Copy Markdown
Contributor Author

@hachikuji Yes that sequence is the issue in KAFKA-7565. So this should prevent that happening. Thank you!

private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
private final Set<Integer> nodesWithPendingFetchRequests;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea that I had was to make this a Map<Integer, Long>, with the value being System.currentTimeMillis() at the time the fetch request is sent.

That would allow the "Skipping fetch for partition" log message to include the duration that the previous request has been pending for (possibly adjusting the log level based on how long ago that previous request was sent), and also enable a fetch request time metric to be easily collected if someone wishes to add that enhancement in the future.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, there is already a metric for fetch request latency.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, ok

@hachikuji hachikuji left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rajinisivaram . LGTM

}

@Test
public void testFetcherSessionEpochUpdate() throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there was no choice but to carefully tailor this test in order to hit the bug. We have to do it, but the downside is that its scope is narrow and may be difficult to keep it relevant as the code evolves. Anyway, hopefully at some point we'll get the time to move all network IO to the background thread and then we can simplify a lot of this.

@rajinisivaram

Copy link
Copy Markdown
Contributor Author

@hachikuji @cmccabe @noslowerdna @jsancio Thanks for the reviews. Merging to trunk and 2.3.

@rajinisivaram rajinisivaram merged commit 012880d into apache:trunk May 21, 2019
rajinisivaram added a commit that referenced this pull request May 21, 2019
…6582)

Reviewers: Jason Gustafson <jason@confluent.io>, Colin Patrick McCabe <cmccabe@confluent.io>, Andrew Olson <aolson1@cerner.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
@manderson23

Copy link
Copy Markdown

Would this ever be merged to a 2.2.x release?

ijuma added a commit to confluentinc/kafka that referenced this pull request May 21, 2019
* apache-github/trunk:
  MINOR: Set `replicaId` for OffsetsForLeaderEpoch from followers (apache#6775)
  MINOR: A few logging improvements in the broker (apache#6773)
  KAFKA-8052; Ensure fetch session epoch is updated before new request (apache#6582)
  KAFKA-8315: fix the JoinWindows retention deprecation doc (apache#6664)
  KAFKA-8265: Fix override config name to match KIP-458. (apache#6776)
  KAFKA-3143: Controller should transition offline replicas on startup
  MINOR: Work around OpenJDK 11 javadocs issue. (apache#6747)
  MINOR: Bump version to 2.4.0-SNAPSHOT (apache#6774)
@cmccabe

cmccabe commented May 22, 2019

Copy link
Copy Markdown
Contributor

Thanks, @rajinisivaram .

@cmccabe

cmccabe commented May 22, 2019

Copy link
Copy Markdown
Contributor

Would this ever be merged to a 2.2.x release?

If there is ever a 2.2.2 release, I don't see any reason why we couldn't merge this.

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…pache#6582)

Reviewers: Jason Gustafson <jason@confluent.io>, Colin Patrick McCabe <cmccabe@confluent.io>, Andrew Olson <aolson1@cerner.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
jonlee2 pushed a commit to jonlee2/kafka that referenced this pull request Jul 19, 2019
…updated before new request (apache#6582)

TICKET = KAFKA-8052
LI_DESCRIPTION =
This will remove intermittent INVALID_FETCH_SESSION_EPOCH errors on fetch requests

EXIT_CRITERIA = HASH [012880d]
ORIGINAL_DESCRIPTION =

Reviewers: Jason Gustafson <jason@confluent.io>, Colin Patrick McCabe <cmccabe@confluent.io>, Andrew Olson <aolson1@cerner.com>, José Armando García Sancio <jsancio@users.noreply.github.com
(cherry picked from commit 012880d)
jonlee2 added a commit to linkedin/kafka that referenced this pull request Jul 22, 2019
…updated before new request (apache#6582) (#31)

TICKET = KAFKA-8052
LI_DESCRIPTION =
This will remove intermittent INVALID_FETCH_SESSION_EPOCH errors on fetch requests

EXIT_CRITERIA = HASH [012880d]
ORIGINAL_DESCRIPTION =

Reviewers: Jason Gustafson <jason@confluent.io>, Colin Patrick McCabe <cmccabe@confluent.io>, Andrew Olson <aolson1@cerner.com>, José Armando García Sancio <jsancio@users.noreply.github.com
(cherry picked from commit 012880d)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants