Clear the CallbackStatus entry from the map in FlushlessEventProducerHandler#843
Merged
vmaheshw merged 14 commits intolinkedin:masterfrom Jul 27, 2021
Merged
Clear the CallbackStatus entry from the map in FlushlessEventProducerHandler#843vmaheshw merged 14 commits intolinkedin:masterfrom
vmaheshw merged 14 commits intolinkedin:masterfrom
Conversation
Pull latest
jzakaryan
previously approved these changes
Jul 26, 2021
Collaborator
jzakaryan
left a comment
There was a problem hiding this comment.
LGTM. Nit for updating comment in the test.
.../com/linkedin/datastream/connectors/kafka/mirrormaker/TestKafkaMirrorMakerConnectorTask.java
Outdated
Show resolved
Hide resolved
| * Clear the source-partition entry from the _callbackStatusMap | ||
| */ | ||
| public void clear(String source, int partition) { | ||
| _callbackStatusMap.remove(new SourcePartition(source, partition)); |
Collaborator
There was a problem hiding this comment.
Just for clarification. Will this result in the correct key getting removed from the map? Does SourcePartition override getHashKey()?
Collaborator
Author
There was a problem hiding this comment.
SourcePartition extends Pair underneath and uses the default Hash() function at Object level. SourcePartition was in use as Key in the HashMap for a long time.
somandal
previously approved these changes
Jul 26, 2021
somandal
approved these changes
Jul 26, 2021
vmaheshw
added a commit
to vmaheshw/brooklin
that referenced
this pull request
Mar 1, 2022
…Handler (linkedin#843) Clear the CallbackStatus entry from the map in FlushlessEventProducerHandler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
ISSUE:
Stuck partitions were reported in prod-lor1 mm.one twice in last 1 week. During both the instances, either the bad Kafka broker was abruptly removed or there were lots of URP(Under replicated partitions).
Root-cause:
a. The initial suspicion was Kafka failing to ACK() for some of the send(), resulting in stuck partitions.
b. There were logs in the files which should not be present:
2021/07/04 13:34:58.325 ERROR [FlushlessEventProducerHandler] [kafka-producer-network-thread | DatastreamLiKafkaProducer] [brooklin-service] [] Internal error: checkpoints should progress in increasing order. Resolved checkpoint as 966284416 which is less than current checkpoint of 966284432
2021/07/04 13:34:58.325 ERROR [FlushlessEventProducerHandler] [kafka-producer-network-thread | DatastreamLiKafkaProducer] [brooklin-service] [] Internal error: checkpoints should progress in increasing order. Resolved checkpoint as 966284363 which is less than current checkpoint of 966284416
This log means that the ACK received is for an offset smaller than the in-memory checkpoint in callbackStatus. This either means out of order offsets coming from kafka or the inflight message set has out-of-order messages for some reason.
So, this stuck partition issue happens when there are 2 consecutive {send() failures, seekToLastCheckpoint() } without the task thread restarting. This is the main Rootcause behind the stuck partitions seen during shutdown when there were brokers removals from Kafka side.
Solution:
To fix this: The ordering of inflight message set is extremely important in this algorithm and because the older states were not cleared, it resulted in undesired transition states.
Option 1: We will clear the callbackStatus entry for the topic Partition which are rewind to older checkpoint().
Option 2: Different data-structure that can maintain the ordering as well as remove the duplicates for inflight message set. Priority Queue can take care of ordering, but will have to be extended to avoid addition of duplicates.
For more deterministic behavior, my recommendation is Option(1).