Disable delete snapshot file during topic deletion and fix bug in gro…#7
Merged
Conversation
…uping stop replica request when callback is null
One thing to notice is that there is already a background thread periodically checkpoionting the recovery offsets and deleting the snapshot files so not deleting snapshot files in the critical path of StopReplica should be okay. |
hzxa21
approved these changes
Mar 27, 2019
hzxa21
left a comment
There was a problem hiding this comment.
Thanks for the investigation and the fix! LGTM.
Let's wait for the CI to finish before pushing the change.
jonlee2
approved these changes
Mar 27, 2019
jonlee2
left a comment
Collaborator
There was a problem hiding this comment.
Thanks for working on this!
xiowu0
reviewed
Mar 27, 2019
xiowu0
left a comment
There was a problem hiding this comment.
Thanks for the FIX. LGTM.
please make sure all unit tests pass.
xiowu0
approved these changes
Mar 27, 2019
earlcoder
added a commit
that referenced
this pull request
Apr 28, 2026
…#7) The Partition.isOneAboveMinIsr method is still defined in 3.6-li but the gauge that exposes it as a JMX/yammer metric was lost in the PR #538 squash. Restoring matches 3.0-li ReplicaManager.scala:296. Audit also flagged the dead-code method as P1 #8 — restoring the gauge wires it back as the original caller.
earlcoder
added a commit
that referenced
this pull request
Apr 28, 2026
Original 3.0-li addition: commit a2bf781 'Add metrics for log compaction threads alive'. The gauge counts cleaner threads that are isAlive(), complementing the already-present DeadThreadCount gauge. Required by LI ops dashboards.
earlcoder
added a commit
that referenced
this pull request
Apr 28, 2026
Original 3.0-li commit: 0e7ab47 'Mark FetchSession cache misses'. The CACHE_MISSES constant and INCREMENTAL_FETCH_SESSION_CACHE_MISSES_PER_SEC metric name were already defined in 3.6-li FetchSession companion object, but the meter registration in FetchSessionCache and the mark() call site on session-not-found were lost in the squash. Restore: - cacheMissesMeter registration in FetchSessionCache (mirrors evictionsMeter pattern, uses 3.6-li Collections.emptyMap() vs 3.0-li Map.empty). - markCacheMiss() method on FetchSessionCache. - markCacheMiss() invocation when a fetch session lookup fails (returning Errors.FETCH_SESSION_ID_NOT_FOUND). Required by LI ops dashboards.
earlcoder
added a commit
that referenced
this pull request
Apr 29, 2026
Original 3.0-li commit: b3489a1 'Add BytesInTotal & MessagesInTotal counters'. The full counter infrastructure was lost in the PR #538 squash: the constants, CounterWrapper, counterMetricTypeMap, accessor methods, and call sites were all gone, plus the KafkaMetricsGroup.newCounter API itself disappeared when KafkaMetricsGroup migrated from Scala trait to Java class (org.apache.kafka.server.metrics). Restore in three parts: 1. KafkaMetricsGroup (Java, server-common): add public newCounter(name) and newCounter(name, tags) methods that delegate to KafkaYammerMetrics.defaultRegistry().newCounter — mirroring the newGauge / newMeter API surface. 2. KafkaRequestHandler.scala (BrokerTopicMetrics + BrokerTopicStats): - Add CounterWrapper case class (mirrors MeterWrapper lazy-init pattern) using metricsGroup.newCounter. - Add counterMetricTypeMap[String, CounterWrapper] populated with MessagesInTotal and BytesInTotal at construction. - Add bytesInTotal and messagesInTotal accessor methods. - Add counterMetricMap test accessor. - Wire close() and closeMetric() to also close counter wrappers. - Add MessagesInTotal and BytesInTotal constants on BrokerTopicStats. 3. ReplicaManager.scala: at the four post-append call sites (per-topic + all-topics for both bytes and messages), increment the counter alongside the existing rate meter mark.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem:
Snapshot files are always deleted during topic deletion, since it involves disk IOs and we do delete for each topic partition, this can slow down the deletion process thus blocking the controller from picking up other requests when there are large number of topic partitions. Since snapshot files are only used for transaction, which is not used in LinkedIn, it can be disabled.
During replica offline, controller is expected to send only one batched STOP_REPLICA request to destination broker with callback set to null. But currently the callback is set to (,) => (), which is not empty, thus preventing the grouping of message, so we end up sending one STOP_REPLICA request for each partition.
Testing:
Verified the fix in cert2 cluster by creating and deleting topics at the same time. With deleting snapshot files during topic deletion, we see ~1.5min delay in controller sending/receiving LEADER_AND_ISR request, and deleting snapshot files takes ~500ms for each partition, which contributes most to the broker processing time of STOP_REPLICA request; without the deleting, the processing time drops to ~10ms.
Committer Checklist (excluded from commit message)