KAFKA-17002: Integrated partition leader epoch for Persister APIs (KIP-932)#16842
Conversation
AndrewJSchofield
left a comment
There was a problem hiding this comment.
I'm not quite sure about this one. Integrating the leader epoch into the persister write API is good, but I am not sure whether the handling of the fenced case makes sense. Essentially, if the Write RPC returns a fenced error, it means that a later leader epoch is being used for this share-partition state, and that means another share-partition leader for this partition is using a later epoch, which presumably means this SPL is philosophically dead.
Hmm, the only scenario I was thinking about that if partition epoch is bumped for any other reason and same broker remains the partition leader then a retry should be needed to fetch latest partition epoch. If that's not true then I agree that retry doesn't make sense. I was also thinking if we should have an identiifer/boolean in SPL that can be toggled when SPL is not a leader. This identifier once set to false will fail all requests with not leader exception, if received any call. So client can switch to new leader. |
|
@mumrah Can you please review as well and provide input on Partition epoch. @AndrewJSchofield might be right and we might just want to fail the request when such leader error occurs. Just confirming prior making the change. |
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the PR. Left a comment.
| + " leader epoch, current leader epoch: {}", groupId, topicIdPartition, leaderEpoch); | ||
| leaderEpoch = getLeaderEpoch(topicIdPartition.topicPartition()); | ||
| // Retry the write state operation. | ||
| return isWriteShareGroupStateSuccessful(stateBatches, true); |
There was a problem hiding this comment.
This doesn't look right. ShareFetch can only be served at the leader. If this broker is not the leader, leaderEpoch is -1. In that case, we need to send a NOT_LEADER_OR_FOLLOWER error to the client instead of retrying.
The easiest thing on receiving FencedLeaderEpochException is probably to always send an error back to the client and mark the SharePartition as invalid. This way, if the broker becomes the leader again, a new SharePartition will be created and trigger the initialization of group state from the share coordinator.
We probably also want to check if the broker is still the leader on each shareFetch/shareAck request.
There was a problem hiding this comment.
I have addressed the concern.
| /** | ||
| * The leader epoch is used to track the partition epoch. | ||
| */ | ||
| private int leaderEpoch; |
There was a problem hiding this comment.
Should this be volatile since it's written and read by different threads?
There was a problem hiding this comment.
It's now read only in share partition.
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the updated PR. A few more comments.
| import java.util.concurrent.locks.ReadWriteLock; | ||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||
|
|
||
|
|
| CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | ||
| Throwable throwable) { | ||
| Throwable throwable | ||
| ) { |
There was a problem hiding this comment.
Why do we need to make this change? The current format seems to match other existing code.
| if (partitionOrError.isLeft()) { | ||
| log.error("Failed to get partition leader for topic partition: {}-{} due to error: {}", | ||
| tp.topic(), tp.partition(), partitionOrError.left().get()); | ||
| return -1; |
There was a problem hiding this comment.
We need to return a NOT_LEADER_OR_FOLLOWER error to the client if the broker is not the leader.
| partitionCacheMap.remove(sharePartitionKey); | ||
| future.completeExceptionally(throwable); | ||
| return; | ||
| partitionCacheMap.computeIfPresent(sharePartitionKey, (k, v) -> null); |
There was a problem hiding this comment.
Is there a particular reason to use computeIfPresent instead of remove? The latter is more intuitive.
There was a problem hiding this comment.
I was trying to solve which is not required. I have moved to remove.
| log.error("Error initializing share partition with key {}", sharePartitionKey, throwable); | ||
| future.completeExceptionally(throwable); | ||
| // TODO: Should the return be -1 or throw an exception? | ||
| private int getLeaderEpoch(TopicPartition tp) { |
There was a problem hiding this comment.
We don't use getters. So this can just be leaderEpoch.
| CompletableFuture<Errors> future = new CompletableFuture<>(); | ||
| sharePartition.acknowledge(memberId, acknowledgePartitionBatches).whenComplete((result, throwable) -> { | ||
| if (throwable != null) { | ||
| handleSharePartitionException(sharePartitionKey, throwable); |
There was a problem hiding this comment.
To be consistent, we want to add the same logic for shareFetch too.
To do this, we need to extend FetchParams such that fetchOnlyLeader() is true for share fetch and handle NotLeaderOrFollowerException accordingly.
|
@junrao Sorry, I was in middle of my changes when last reviewed. But thanks for the feedback. I have completed the changes now, can you please re-review. Sorry for the trouble. |
| maybeCompleteShareFetchExceptionally(future, Collections.singletonList(topicIdPartition), throwable); | ||
| } | ||
|
|
||
| private void handleSharePartitionException( |
There was a problem hiding this comment.
I suggest "fenced" should appear in the method name. This is essentially seeing if the exception indicates fencing and then discarding the share partition from the cache.
There was a problem hiding this comment.
My idea was to have general exception handling method as we might find other exceptions which needs some handling as well. Do you think we should strictly have fenced special handling method?
There was a problem hiding this comment.
My point was that you're really only handling a couple of exceptions at the moment, and they're both fencing-related. Anyway, just a suggestion.
There was a problem hiding this comment.
Thanks, I have changed the method name. Might move to generic one if required in future.
| // The share partition is fenced hence remove the partition from map and let the client retry. | ||
| // But surface the error to the client so client might take some action i.e. re-fetch | ||
| // the metadata and retry the fetch on new leader. | ||
| partitionCacheMap.remove(sharePartitionKey); |
There was a problem hiding this comment.
Does the SharePartition need to be put into a fenced state? Removing it from the cache is good for future requests, but is that enough for the object which is already in existence?
There was a problem hiding this comment.
So we remove share partition from cache at 2 places. 1) When initialization failed 2) When fenced error occurs.
For 1, it's safe as its still in initilization state.
For 2, I was in mixed opinion. As all interactions with share partition happens currently while fetching instance from cache hence once removed or re-initialized the new state should appear. But if old share partition instance is already held by some other thread then acknowledge will anyways fail but fetch can succeed. Do you think it would be sensible to have another state in SharePartition as Fenced, which once set then fetch lock on that share partition cannot be attained. Do you think we should have an active status check on all Share Partition APIs as well?
cc: @adixitconfluent
There was a problem hiding this comment.
I think its better to have a new state. Should we have a state as TERMINATED/FENCED because a share partition should also be removed from SPM in case the topic partition is deleted or becomes a follower (https://issues.apache.org/jira/browse/KAFKA-17783). Wdyt @apoorvmittal10 @AndrewJSchofield
There was a problem hiding this comment.
Yes, I think these states sound sensible.
There was a problem hiding this comment.
Do you think we should have an active status check on all Share Partition
Yes, I agree on this as well
There was a problem hiding this comment.
Do we have a state machine written down somewhere for SharePartition?
There was a problem hiding this comment.
Hmmm, not yet. Let me create the state transitions better in a single place where we can see the transitions.
| } | ||
|
|
||
| private int leaderEpoch(TopicPartition tp) { | ||
| Either<Errors, Partition> partitionOrError = replicaManager.getPartitionOrError(tp); |
There was a problem hiding this comment.
I've checked the error codes here and the KIP looks right to me.
| int maxBytes, | ||
| FetchIsolation isolation, | ||
| Optional<ClientMetadata> clientMetadata, | ||
| boolean shareFetchRequest) { |
There was a problem hiding this comment.
I wonder whether this is a bit literal. Why are we supplying a ClientMetadata here in the share fetch case? That seems to me to be concerned with fetch-from-follower. If we didn't supply a ClientMetadata, then fetchOnlyLeader() would already return true without needing the new flag.
There was a problem hiding this comment.
So ClientMetadata includes details for rackId, listenerName which I see in regular fetch if utilized for figuring read replica. Not sure if that would be relevant for ShareFetch in future and if we should skip always for ShareFetch. Hence I have added the additional shareFetchBoolean. Please let me know what you think. Also adding @junrao to provide his inputs.
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the updated PR. A few more comments.
| .setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(), | ||
| Collections.singletonList(PartitionFactory.newPartitionStateBatchData( | ||
| topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches)))) | ||
| topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches)))) |
There was a problem hiding this comment.
If we get an error like UNKNOWN_TOPIC_OR_PARTITION or FENCED_STATE_EPOCH, should we remove the sharePartition too?
There was a problem hiding this comment.
I have handled it.
| ", maxBytes=" + maxBytes + | ||
| ", isolation=" + isolation + | ||
| ", clientMetadata=" + clientMetadata + | ||
| ", shareFetchRequest=" + shareFetchRequest + |
There was a problem hiding this comment.
Should we include the new param in hashCode and equals too?
There was a problem hiding this comment.
Missed it, thanks added.
| return; | ||
| } | ||
|
|
||
| // Remove the partition from the cache as it's failed to initialize. |
There was a problem hiding this comment.
At the beginning of this method, we check for LeaderNotAvailableException. When do we get that exception? ReadShareGroupStateResponse doesn't seem to have that error.
There was a problem hiding this comment.
LeaderNotAvailableException is an internal exception from SharePartition to SharePartitionManager which can occur only when SharePartition is in process of initialization and not yet complete. Hence fo that period just keep the requests in purgatory.
| private void handleFencedSharePartitionException( | ||
| SharePartitionKey sharePartitionKey, | ||
| Throwable throwable | ||
| ) { |
There was a problem hiding this comment.
Should we include UNKNOWN_TOPIC_OR_PARTITION below too?
| if (!future.isDone()) { | ||
| Errors error = Errors.forException(throwable); | ||
| future.complete(topicIdPartitions.stream().collect(Collectors.toMap( | ||
| tp -> tp, tp -> new PartitionData().setErrorCode(error.code()).setErrorMessage(error.message())))); |
There was a problem hiding this comment.
We lose the error message in throwable when converting it to Error.
| private void maybeCompleteInitializationWithException( | ||
| SharePartitionKey sharePartitionKey, | ||
| CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | ||
| TopicIdPartition topicIdPartition, |
There was a problem hiding this comment.
Do we need this since it's part of SharePartitionKey?
There was a problem hiding this comment.
Thanks, removed.
| SharePartition partition = new SharePartition( | ||
| sharePartitionKey.groupId(), | ||
| sharePartitionKey.topicIdPartition(), | ||
| leaderEpoch, |
There was a problem hiding this comment.
ShareFetchUtils.leaderEpoch can return exceptions like NOT_LEADER_OR_FOLLOWER and UNKNOWN_TOPIC_OR_PARTITION. Should we handle that at the partition level?
There was a problem hiding this comment.
Handled at partition level. But I have added a couple of TODOs in the PR to take them in follow up as this PR is getting bigger. I am of opinion that we can take the changes incrementally.
| } catch (Exception e) { | ||
| log.error("Error processing delayed share fetch request", e); | ||
| shareFetchData.future().completeExceptionally(e); | ||
| sharePartitionManager.handleFetchException(shareFetchData.groupId(), topicPartitionData.keySet(), shareFetchData.future(), e); |
There was a problem hiding this comment.
Typically, the error code is returned in responseLogResult. So we need to handle the error there too.
There was a problem hiding this comment.
As we are going with massive refactor with min bytes PR in delayed share fetch, also the handling of this error will have it's own quite possible cases hence created a jira to take that up in following PR: https://issues.apache.org/jira/browse/KAFKA-17887
There was a problem hiding this comment.
@junrao I think the error from LogReadResult is already handled as we set the respective partition level error while parsing response in ShareFetchUtils.processFetchResponse method. Am I missing something?
There was a problem hiding this comment.
Yes, it seems this is no longer an issue.
There was a problem hiding this comment.
Thanks for confirming @junrao, I have closed the ticket.
| } | ||
|
|
||
| // Remove the partition from the cache as it's failed to initialize. | ||
| partitionCacheMap.remove(sharePartitionKey); |
There was a problem hiding this comment.
- We probably want to verify the leader epoch in SharePartition before removal to avoid a newly created SharePartition being removed by an old request.
- We piggyback the removal in an error response. The downside is that there are quite a few error places to handle and it is reactive. An alternative is to have a partitionLeaderEpochChange listener. We can then proactively remove a SharePartition when the leader epoch changes.
There was a problem hiding this comment.
Both are great suggestions, do you think it would be right to have it in follow up PRs?
|
@junrao I have made a lot changes and the PR is getting bigger and bigger. I am reverting some of my local changes and will write a plan with jiras so we can proceed. |
| /** | ||
| * The leader epoch is used to track the partition epoch. | ||
| */ | ||
| private final int leaderEpoch; |
There was a problem hiding this comment.
Making this final implies the SharePartition is now scoped to the lifetime of a partition's leader epoch. Since SPs are managed by the node which is the leader for that partition, I guess this is already the case (and not really a problem). We normally expect the leader to move when the leader epoch increases, but I'm not sure if that's always the case.
Hypothetically, if a leader epoch increased but the leader did not move, would it be possible to reuse the SharePartition state? Or would we need to re-load its state from the persister anyways?
There was a problem hiding this comment.
Yeah, it's for share partition life time. We are marking the Share Partition fenced and un-usable if an error occurs. Which means the state should be re-loaded from the persister to function ahead.
| // The share partition is fenced hence remove the partition from map and let the client retry. | ||
| // But surface the error to the client so client might take some action i.e. re-fetch | ||
| // the metadata and retry the fetch on new leader. | ||
| partitionCacheMap.remove(sharePartitionKey); |
There was a problem hiding this comment.
Do we have a state machine written down somewhere for SharePartition?
| return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset; | ||
| } | ||
|
|
||
| static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) { |
There was a problem hiding this comment.
Returning an OptionalInt would be a bit nicer than throwing there (maybe). If we actually want a helper method that throws, we should incorporate that into the name (e.g., leaderEpochOrThrow)
There was a problem hiding this comment.
With an optionalInt again we need to check and pass the right error. So either we have this method or create leaderEpochOrThrow in replica manager.
|
So here is the summary. Sorry, the PR is growing. As now it's more of error handling and less of just leader epoch propagation. The current PR has following:
Some follow ups to do:
I am planning to have couple of follow up PRs, if @junrao @AndrewJSchofield @mumrah you think is fine. As with tests and changes, this PR is getting bigger and bigger. |
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the updated PR. A few more comments.
| CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | ||
| Throwable throwable | ||
| ) { | ||
| topicIdPartitions.forEach(topicIdPartition -> handleFencedSharePartitionException(sharePartitionKey(groupId, topicIdPartition), throwable)); |
There was a problem hiding this comment.
This is weird. We actually don't know which partition causes throwable. Ideally, we should just set a top level error instead of applying it on each partition. We probably shouldn't remove the SharePartition here since we are not sure which partition to remove.
There was a problem hiding this comment.
Hmmm, though you are right but the problem is that ShareFetchResponse contains both Fetch and Acknowledgement response. If we set a top level error code then the acknowledgement in request will also be responded as failed however acknowledgement might have succeeded.
The reason I was removeing the SharePartition in this condition was that recreating a new SharePartition is cheap. However if you think we shouldn't remove then I will avoid that and see waht scenarios we encounter during the tests (moving partitions in a cluster).
| // as this situation is not expected. | ||
| log.error("Error processing share fetch request", e); | ||
| if (erroneous == null) { | ||
| erroneous = new HashMap<>(); |
There was a problem hiding this comment.
It seems it's more intuitive to initialize erroneous as an empty map so that we don't need to deal with it being null.
There was a problem hiding this comment.
The exception occuring from getOrCreateSharePartition will be rare hence I was of an opinion of not initializing the erroneous and do that only when needed.
| private void completeShareFetchWithException(CompletableFuture<Map<TopicIdPartition, PartitionData>> future, | ||
| Map<TopicIdPartition, Throwable> erroneous) { | ||
| future.complete(erroneous.entrySet().stream().collect(Collectors.toMap( | ||
| Map.Entry::getKey, entry -> new PartitionData().setErrorCode(Errors.forException(entry.getValue()).code()).setErrorMessage(entry.getValue().getMessage())))); |
There was a problem hiding this comment.
The line is getting too long. Could we avoid calling entry.getValue() twice?
There was a problem hiding this comment.
Agree, done.
| } | ||
|
|
||
| private boolean stateNotActive() { | ||
| return partitionState() != SharePartitionState.ACTIVE; |
There was a problem hiding this comment.
We probably should throw a fenced exception and let the caller handle it. This can be done in a separate PR.
There was a problem hiding this comment.
| when(sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null)); | ||
|
|
||
| SharePartition sp1 = mock(SharePartition.class); | ||
| // Do not make the share partition acquirable hence it shouldn't be removed from the cache, |
There was a problem hiding this comment.
Hmm, should we explicitly mock sp1.maybeAcquireFetchLock to false?
There was a problem hiding this comment.
Thanks, done.
|
|
||
| CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = | ||
| sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, partitionMaxBytes); | ||
| validateShareFetchFutureException(future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced exception"); |
There was a problem hiding this comment.
Hmm, why does the completed future have only 1 partition?
There was a problem hiding this comment.
Because fetch didn't happen for other partition as it was not acquirable. Though final response from KafkaApis consolidates the topic-partitions from erroneous (from KafkaAPIs), share partition manager's topic partitions and remaining with empty data.
|
Hi @junrao , thanks for the feedback. Addressed and replied. https://issues.apache.org/jira/browse/KAFKA-17463?jql=text%20~%20%22testShareGroups%22 |
junrao
left a comment
There was a problem hiding this comment.
@apoorvmittal10 : Thanks for the updated PR. LGTM
…P-932) (apache#16842) The PR integrates leader epoch for partition while invoking Persister APIs. The write RPC is retried once on leader epoch failure. Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
…P-932) (apache#16842) The PR integrates leader epoch for partition while invoking Persister APIs. The write RPC is retried once on leader epoch failure. Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
…P-932) (apache#16842) The PR integrates leader epoch for partition while invoking Persister APIs. The write RPC is retried once on leader epoch failure. Reviewers: Abhinav Dixit <adixit@confluent.io>, Andrew Schofield <aschofield@confluent.io>, Jun Rao <junrao@gmail.com>, David Arthur <mumrah@gmail.com>
The PR integrates leader epoch for partition while invoking Persister APIs. The write RPC is retried once on leader epoch failure.
Committer Checklist (excluded from commit message)