fix(kafka): stop producer waiting up to 1s per message on busy topics (p99 7s → 22ms)#894
Conversation
The franz-go switch rewired the URL query param `flush_frequency` from
Sarama's "max time between flushes" to franz-go's `kgo.ProducerLinger`,
which is a PER-PARTITION linger. On the dev-scale-1/2 txmeta topic
(256 partitions, ~5 batched msgs/s/partition at 1.2M TPS peak) each
partition rarely fills 1MiB before the 1s linger, so every record paid
~1s of producer-side delay. The subtree-validator's local cache lagged
the validator by 1-2s and every subtree hit ProcessTxMetaUsingCache's
ThresholdExceededError -> 1s RetrySleep -> retry — visible as the
"validate_subtree_retry" rate matching the subtree rate and as gaps in
the "Tx Meta read from Kafka /second" Grafana panel.
- settings.conf: txmeta, validatortxs.operator, legacyInv get
flush_frequency=10ms (was 1s). Low-volume topics (invalidBlocks,
rejectedTx, unitTest) keep 1s; their per-partition rate is low
enough that latency doesn't matter.
- util/kafka/kafka_producer_async.go: documentation block at the
franz-go option site explaining the Sarama->franz-go semantic
shift for each `flush_*` URL param, so the next operator doesn't
re-introduce this.
- util/kafka/linger_latency_regression_test.go: regression test
spinning up Redpanda via testcontainers, demonstrating that
flush_frequency=1s produces p50 latency 200x larger than
flush_frequency=10ms on a 32-partition topic with sparse feed.
NOTE: dev-scale-1/2 configmaps override flush_frequency=1s explicitly
in scale-1-shared-config.kafka_txmetaConfig — those need a matching
update in the teranode-argocd-deployments repo for the fix to land
in those clusters.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Splits the single FlushFrequency knob that previously drove both franz-go's per-partition ProducerLinger AND the outer async-batcher's straggler-flush timer. A new URL query param `outer_batcher_linger` (field: OuterBatcherLinger, default 10ms) controls only the outer batcher; `flush_frequency` now controls only kgo.ProducerLinger, which is what an operator looking at the URL expects. Without this fix, setting flush_frequency=1s — which on the dev-scale clusters was the intent of "match Sarama's 1s Flush.Frequency" — stacked two lingers on the same publish path. The regression test (sparse feed, 32 partitions) goes from p50=4.49s/p99=6.95s with the stacked behaviour to p50=513ms/p99=1.01s with the franz-go linger alone (and to ~22ms p99 once flush_frequency is also lowered). The settings.conf change in the first commit on this branch handles the second of those steps; this change handles the first. Adds unit-test coverage that: - the new URL param parses and applies (250ms test value), - flush_frequency=1s no longer influences OuterBatcherLinger. Updates the integration test commentary to reflect that the outer batcher's linger no longer stacks with franz-go's. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
🤖 Claude Code Review Status: Complete This PR effectively resolves a critical performance regression in Kafka producer latency. The root cause analysis is sound: Summary:
Previous Issues - All Resolved:
No new issues found. The fix is production-proven and ready for merge. |
There was a problem hiding this comment.
Pull request overview
This PR addresses a kafka producer-side latency regression introduced by the Sarama → franz-go migration by (1) lowering flush_frequency (franz-go ProducerLinger) for high-fanout topics and (2) decoupling the producer wrapper’s outer batcher “straggler flush” timer from flush_frequency via a new outer_batcher_linger URL param.
Changes:
- Add
OuterBatcherLinger(URL:outer_batcher_linger, default 10ms) and rewire the outer drain goroutine to use it instead offlush_frequency. - Update
settings.confto useflush_frequency=10msfor high-fanout topics (txmeta,legacyInv,validatortxsConfig.operator). - Add a perf-tagged end-to-end regression test for publish→consume latency and unit tests ensuring the new linger knob is plumbed and decoupled.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| util/kafka/linger_latency_regression_test.go | Adds a perf-tagged regression test to demonstrate latency impact of flush_frequency (ProducerLinger) under high partition fanout. |
| util/kafka/kafka_producer_async.go | Introduces OuterBatcherLinger and decouples outer batcher linger timing from FlushFrequency/ProducerLinger. |
| util/kafka/kafka_producer_async_test.go | Extends URL parsing default-value test and adds coverage for outer_batcher_linger plumbing/decoupling. |
| settings.conf | Lowers flush_frequency to 10ms on specific high-fanout topics to prevent per-record linger delays. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // test isolates ProducerLinger. Unit-level coverage that the two fields are | ||
| // decoupled lives in TestNewKafkaAsyncProducerFromURLOuterBatcherLinger. | ||
| // | ||
| // Run: go test -v -run TestLingerLatencyRegression -timeout 5m ./util/kafka/ |
There was a problem hiding this comment.
Good catch — added -tags perf to the Run: example in the doc comment, matching the build tag and the PR description.
Addresses Copilot review on PR bsv-blockchain#894: the doc comment's `go test ...` example was missing the `-tags perf` flag required to compile and run the test, which is gated by `//go:build perf`.
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-06-02 09:06 UTC |
| FlushBytes int // Flush threshold in bytes → kgo.ProducerBatchMaxBytes (see above) | ||
| FlushMessages int // Number of messages before flush → kgo.MaxBufferedRecords (see above) | ||
| FlushFrequency time.Duration // Time between flushes → kgo.ProducerLinger (see above) | ||
| OuterBatcherLinger time.Duration // Straggler-flush timer for the outer drain goroutine; defaults to defaultOuterBatcherLinger when zero/negative |
There was a problem hiding this comment.
Confirming this documentation inconsistency. The code at line 540 (if len(buffered) >= maxBatch) uses FlushMessages (via currentBatchSize()) as a flush trigger for the outer batcher, contradicting the comment at line 63-64 that states it is "NOT a flush trigger."
The dual usage is:
kgo.MaxBufferedRecords(cfg.FlushMessages)at line 280 — franz-go backpressure capcurrentBatchSize()returningcfg.FlushMessages— outer batcher flush threshold at line 540
The documentation should clarify this dual purpose to avoid operator confusion when tuning flush_messages.
✅ RESOLVED - Documentation updated to correctly describe dual usage.
There was a problem hiding this comment.
Good catch — fixed in the latest commit. The doc block on KafkaProducerConfig and the field-line comments now describe FlushMessages as dual-use (kgo.MaxBufferedRecords backpressure cap and outer-batcher flush-size trigger via currentBatchSize() at line 540), and FlushFrequency is described as ProducerLinger-only since the outer batcher now uses OuterBatcherLinger.
…quency-linger-regression
…quency-linger-regression
oskarszoon
left a comment
There was a problem hiding this comment.
Approve. Clean RCA: identifies both the franz-go semantic shift (FlushFrequency = per-partition ProducerLinger, not "max time between flushes") and the second-order outer-batcher straggler-timer bug. Topic-scoped via URL params, high-fanout topics drop to 10ms, low-volume ones stay at 1s.
Nits:
- Adaptive-slow range silently compressed 10× from
[200ms, 5s]to[50ms, 500ms]. Mechanical consequence of the base change but worth a CHANGELOG note. - Outer-batcher decoupling only tested at config-parse level — the "two lingers stacked" regression that motivated this isn't directly covered.
- External configmap overrides:
terabuild,mainnet,testnet,teratestnetlive interanode-argocd-deployments— out of scope but worth surfacing for the rollout.
Copilot + github-actions both flagged that the field comments on KafkaProducerConfig drifted from actual behaviour: - FlushMessages is dual-use: kgo.MaxBufferedRecords AND outer-batcher flush-size trigger via currentBatchSize() at line 540 (not 'NOT a flush trigger' as the previous comment claimed). - FlushFrequency now only drives kgo.ProducerLinger (per-partition broker linger); the outer batcher uses OuterBatcherLinger. Updates the doc block and the field-line comments to describe both effects accurately. No code change.
…ect stacked-lingers test 1. Document the 10× compression of the adaptive-slow linger bounds (was [200ms, 5s] when driven by FlushFrequency default 10s; now [50ms, 500ms] driven by OuterBatcherLinger default 10ms). Same shape, scaled with the new base — kept inline on currentBatchLinger since there is no CHANGELOG. 2. Add TestStackedLingerRegression to directly exercise the bug the decoupling was meant to fix. Holds franz-go ProducerLinger at 1s and varies only outer_batcher_linger (1s vs 10ms). Confirms the pre-fix stacked configuration adds the outer batcher's linger on top of franz-go's, and the post-fix default doesn't. Local run showed stacked p50=4.49s vs decoupled p50=519ms (≈9× gap).
… into deploy branch
|
ordishs
left a comment
There was a problem hiding this comment.
Approve. Clean decoupling — verified FlushFrequency now feeds only kgo.ProducerLinger and currentBatchLinger() reads OuterBatcherLinger exclusively, with no residual coupling. New param is additive and non-breaking. Strong test coverage: the unit test guards the core invariant (flush_frequency does not influence outer linger) in fast tests, and the perf-tagged tests reproduce the pathology end-to-end with noise-tolerant thresholds.
A couple of non-blocking follow-ups:
- Release-note the operator-facing semantic change: self-hosted configs that set
flush_frequencyexpecting it to also throttle the outer batcher now get the 10ms outer default. - Confirm
subtrees/invalidSubtreesare spared only becauseflush_messages=1force-flushes per record regardless of the inherited 10s linger — a futureflush_messagesbump there would silently reintroduce it.



What this fixes
The kafka producer was waiting up to a full second before sending each message on our busiest topics, which cascaded into the subtree-validator constantly retrying. This PR cuts that wait to 10ms on those topics and removes a second, hidden wait that was stacking on top of it.
Result: p99 publish→consume latency dropped from ~7s to ~22ms. Already deployed to dev-scale-1/2 and confirmed working at production load (see "After deploy" below).
This is a clean re-do of #840, rebased off
maininstead offeat/teranode-native-ops. Same two kafka commits, plus follow-ups for review feedback.Why the producer was so slow
When we switched the kafka client from Sarama to franz-go (#611), one URL parameter —
flush_frequency— silently changed meaning:Our setting was
flush_frequency=1s. On the txmeta topic (256 partitions, only ~5 messages/sec/partition at peak), no partition ever filled a 1 MiB batch in time, so every single message paid ~1 second of producer-side delay before being sent.That delay starved the subtree-validator's local cache, which then hit
ThresholdExceededError, slept 1 second, and retried — for every subtree.The two fixes
1. Drop
flush_frequencyfrom 1s to 10ms on busy topics.Applied to
txmeta,validatortxsConfig.operator, andlegacyInv. Low-volume topics (invalidBlocks,rejectedTx,unitTest) keep1s— they don't have the fan-out problem.2. Split one config into two so we stop waiting twice.
The producer wrapper has its own small "wait a moment in case more messages arrive" timer (the outer batcher). It was reading the same
flush_frequencyvalue — so withflush_frequency=1syou'd wait up to 1s in the outer batcher, then another up to 1s in franz-go. Two stacked lingers on the same message.This PR adds a new URL param
outer_batcher_linger(default 10ms) that controls only the outer batcher.flush_frequencynow controls only the franz-go linger — which is what someone reading the URL would expect.Before / after — pre-deploy tests
TestLingerLatencyRegression— flush_frequency at production vs proposed valuesLocal Redpanda, 32-partition topic, 200 messages 25ms apart:
flush_frequency=10ms(both fixes)TestStackedLingerRegression— directly proves the outer-batcher stacking (added in review)Holds
flush_frequency=1s(franz-go ProducerLinger) constant and varies onlyouter_batcher_linger:The ~9× p50 gap is attributable purely to the outer batcher, confirming the stacking was real and is now gone by default.
After deploy — dev-scale-1/2 at ~1.30M TPS
Applied configmap (
flush_frequency: 1s → 5mson txmeta + legacyInv) at 2026-05-11 11:27 UTC. After ~22 min of sustained load:/metricsscrape durationvalidate_subtree_retryratevalidate_subtree_durationp99One thing to keep an eye on (not a regression):
bless_missing_transaction_countnow fires at very low rates (~0.23/s on scale-1, ~0.85/s on scale-2 with one 20.76/s burst) where it was zero before. That path never fired pre-fix because the retry loop short-circuited every cache miss. Post-fix, real cache misses fall through to the legitimate "fetch from UTXO store" path. Rate is ~0.00007% of txs — fine, but a good alarm signal if it grows.Side effect: adaptive-slow linger bounds compressed 10×
currentBatchLinger()clamps the adaptive-slow outer-batcher linger to[50ms, 500ms]. Pre-fix, when this function readFlushFrequency(default 10s), the bounds were[200ms, 5s]. Both ranges were compressed by the same factor when the base switched toOuterBatcherLinger(default 10ms), so the bounds still serve their original purpose — "a few base lingers" floor, "small multiple of base" ceiling — at the new scale. Now documented inline on the function (no CHANGELOG in this repo).Test plan
go vet ./util/kafka/cleango build ./util/kafka/cleango test -short -count=1 ./util/kafka/passesgo test -tags perf -v -run TestLingerLatencyRegression -timeout 5m ./util/kafka/passes with the numbers abovego test -tags perf -v -run TestStackedLingerRegression -timeout 5m ./util/kafka/passes with the numbers aboveflush_frequencyoverride relies on the old semanticNote on the regression test build tag
The new
linger_latency_regression_test.gouses//go:build perfto stay consistent withkafka_perf_test.go, which holds the sharedsanitizeTopicComponenthelper under the same tag (added by #837).