Skip to content

fix(kafka): stop producer waiting up to 1s per message on busy topics (p99 7s → 22ms)#894

Merged
freemans13 merged 9 commits into
bsv-blockchain:mainfrom
freemans13:stu/kafka-flush-frequency-linger-regression
Jun 3, 2026
Merged

fix(kafka): stop producer waiting up to 1s per message on busy topics (p99 7s → 22ms)#894
freemans13 merged 9 commits into
bsv-blockchain:mainfrom
freemans13:stu/kafka-flush-frequency-linger-regression

Conversation

@freemans13

@freemans13 freemans13 commented May 19, 2026

Copy link
Copy Markdown
Collaborator

What this fixes

The kafka producer was waiting up to a full second before sending each message on our busiest topics, which cascaded into the subtree-validator constantly retrying. This PR cuts that wait to 10ms on those topics and removes a second, hidden wait that was stacking on top of it.

Result: p99 publish→consume latency dropped from ~7s to ~22ms. Already deployed to dev-scale-1/2 and confirmed working at production load (see "After deploy" below).

This is a clean re-do of #840, rebased off main instead of feat/teranode-native-ops. Same two kafka commits, plus follow-ups for review feedback.

Why the producer was so slow

When we switched the kafka client from Sarama to franz-go (#611), one URL parameter — flush_frequency — silently changed meaning:

  • In Sarama: "maximum time the producer waits before flushing whatever it has buffered." A safety net.
  • In franz-go: "per-partition linger — the producer waits this long for each partition's batch to fill up before sending it, every time." Now it's on the critical path of every message.

Our setting was flush_frequency=1s. On the txmeta topic (256 partitions, only ~5 messages/sec/partition at peak), no partition ever filled a 1 MiB batch in time, so every single message paid ~1 second of producer-side delay before being sent.

That delay starved the subtree-validator's local cache, which then hit ThresholdExceededError, slept 1 second, and retried — for every subtree.

The two fixes

1. Drop flush_frequency from 1s to 10ms on busy topics.
Applied to txmeta, validatortxsConfig.operator, and legacyInv. Low-volume topics (invalidBlocks, rejectedTx, unitTest) keep 1s — they don't have the fan-out problem.

2. Split one config into two so we stop waiting twice.
The producer wrapper has its own small "wait a moment in case more messages arrive" timer (the outer batcher). It was reading the same flush_frequency value — so with flush_frequency=1s you'd wait up to 1s in the outer batcher, then another up to 1s in franz-go. Two stacked lingers on the same message.

This PR adds a new URL param outer_batcher_linger (default 10ms) that controls only the outer batcher. flush_frequency now controls only the franz-go linger — which is what someone reading the URL would expect.

Before / after — pre-deploy tests

TestLingerLatencyRegression — flush_frequency at production vs proposed values

Local Redpanda, 32-partition topic, 200 messages 25ms apart:

Code state p50 p99
Both bugs present (stacked 1s + 1s lingers) 4.49s 6.95s
Outer batcher decoupled (one linger only) 513ms 1.01s
flush_frequency=10ms (both fixes) 21ms 22ms

TestStackedLingerRegression — directly proves the outer-batcher stacking (added in review)

Holds flush_frequency=1s (franz-go ProducerLinger) constant and varies only outer_batcher_linger:

Outer batcher linger franz-go linger p50 p99
1s (pre-fix coupled behaviour) 1s 4.49s 6.95s
10ms (post-fix default) 1s 519ms 1.01s

The ~9× p50 gap is attributable purely to the outer batcher, confirming the stacking was real and is now gone by default.

After deploy — dev-scale-1/2 at ~1.30M TPS

Applied configmap (flush_frequency: 1s → 5ms on txmeta + legacyInv) at 2026-05-11 11:27 UTC. After ~22 min of sustained load:

Metric Before After Change
Consumer rate variance 200k–2.2M/s (visible gaps) 1.28–1.32M/s (smooth) gaps gone
Producer buffered (txmeta) mean 72k, peak 186k max 2 ~100,000× lower
Producer e2e latency p99 mean 408ms, max 1.6s 63ms (flat) ~26× lower
Broker write latency p99 mean 642ms, max 1.8s 63ms (flat) ~28× lower
Subtree-validator goroutines 28k–693k 3.8k–4.0k ~175× lower
/metrics scrape duration mean 140ms, max 10s (timing out) 5–11ms endpoint healthy
validate_subtree_retry rate mean 0.9/s, peak 2.3/s (~2 attempts/subtree) mean 0.94/s = 1/subtree retries gone
validate_subtree_duration p99 mean 16s, max 127s 1.9s and trending ~8× faster

One thing to keep an eye on (not a regression): bless_missing_transaction_count now fires at very low rates (~0.23/s on scale-1, ~0.85/s on scale-2 with one 20.76/s burst) where it was zero before. That path never fired pre-fix because the retry loop short-circuited every cache miss. Post-fix, real cache misses fall through to the legitimate "fetch from UTXO store" path. Rate is ~0.00007% of txs — fine, but a good alarm signal if it grows.

Side effect: adaptive-slow linger bounds compressed 10×

currentBatchLinger() clamps the adaptive-slow outer-batcher linger to [50ms, 500ms]. Pre-fix, when this function read FlushFrequency (default 10s), the bounds were [200ms, 5s]. Both ranges were compressed by the same factor when the base switched to OuterBatcherLinger (default 10ms), so the bounds still serve their original purpose — "a few base lingers" floor, "small multiple of base" ceiling — at the new scale. Now documented inline on the function (no CHANGELOG in this repo).

Test plan

  • go vet ./util/kafka/ clean
  • go build ./util/kafka/ clean
  • go test -short -count=1 ./util/kafka/ passes
  • go test -tags perf -v -run TestLingerLatencyRegression -timeout 5m ./util/kafka/ passes with the numbers above
  • go test -tags perf -v -run TestStackedLingerRegression -timeout 5m ./util/kafka/ passes with the numbers above
  • Deployed to dev-scale-1/2; metrics confirm the regression is resolved
  • Operators of self-hosted deployments to confirm no local flush_frequency override relies on the old semantic

Note on the regression test build tag

The new linger_latency_regression_test.go uses //go:build perf to stay consistent with kafka_perf_test.go, which holds the shared sanitizeTopicComponent helper under the same tag (added by #837).

freemans13 and others added 2 commits May 19, 2026 13:46
The franz-go switch rewired the URL query param `flush_frequency` from
Sarama's "max time between flushes" to franz-go's `kgo.ProducerLinger`,
which is a PER-PARTITION linger. On the dev-scale-1/2 txmeta topic
(256 partitions, ~5 batched msgs/s/partition at 1.2M TPS peak) each
partition rarely fills 1MiB before the 1s linger, so every record paid
~1s of producer-side delay. The subtree-validator's local cache lagged
the validator by 1-2s and every subtree hit ProcessTxMetaUsingCache's
ThresholdExceededError -> 1s RetrySleep -> retry — visible as the
"validate_subtree_retry" rate matching the subtree rate and as gaps in
the "Tx Meta read from Kafka /second" Grafana panel.

  - settings.conf: txmeta, validatortxs.operator, legacyInv get
    flush_frequency=10ms (was 1s). Low-volume topics (invalidBlocks,
    rejectedTx, unitTest) keep 1s; their per-partition rate is low
    enough that latency doesn't matter.
  - util/kafka/kafka_producer_async.go: documentation block at the
    franz-go option site explaining the Sarama->franz-go semantic
    shift for each `flush_*` URL param, so the next operator doesn't
    re-introduce this.
  - util/kafka/linger_latency_regression_test.go: regression test
    spinning up Redpanda via testcontainers, demonstrating that
    flush_frequency=1s produces p50 latency 200x larger than
    flush_frequency=10ms on a 32-partition topic with sparse feed.

NOTE: dev-scale-1/2 configmaps override flush_frequency=1s explicitly
in scale-1-shared-config.kafka_txmetaConfig — those need a matching
update in the teranode-argocd-deployments repo for the fix to land
in those clusters.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Splits the single FlushFrequency knob that previously drove both
franz-go's per-partition ProducerLinger AND the outer async-batcher's
straggler-flush timer. A new URL query param `outer_batcher_linger`
(field: OuterBatcherLinger, default 10ms) controls only the outer
batcher; `flush_frequency` now controls only kgo.ProducerLinger, which
is what an operator looking at the URL expects.

Without this fix, setting flush_frequency=1s — which on the dev-scale
clusters was the intent of "match Sarama's 1s Flush.Frequency" — stacked
two lingers on the same publish path. The regression test (sparse feed,
32 partitions) goes from p50=4.49s/p99=6.95s with the stacked behaviour
to p50=513ms/p99=1.01s with the franz-go linger alone (and to ~22ms p99
once flush_frequency is also lowered). The settings.conf change in the
first commit on this branch handles the second of those steps; this
change handles the first.

Adds unit-test coverage that:
  - the new URL param parses and applies (250ms test value),
  - flush_frequency=1s no longer influences OuterBatcherLinger.

Updates the integration test commentary to reflect that the outer
batcher's linger no longer stacks with franz-go's.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@github-actions

github-actions Bot commented May 19, 2026

Copy link
Copy Markdown
Contributor

🤖 Claude Code Review

Status: Complete


This PR effectively resolves a critical performance regression in Kafka producer latency. The root cause analysis is sound: flush_frequency changed semantic meaning when migrating from Sarama to franz-go, and the outer batcher was unintentionally reusing the same value, causing stacked latency.

Summary:

  • ✅ Code changes are minimal and well-targeted
  • ✅ Comprehensive test coverage with two regression tests
  • ✅ Documentation accurately describes the dual usage of config params
  • ✅ Production metrics validate the fix (p99 latency: 7s → 22ms)
  • ✅ All previous review issues have been addressed

Previous Issues - All Resolved:

  • Missing -tags perf in test run command → Fixed
  • Documentation inconsistency for FlushMessages/FlushFrequency → Fixed with clear dual-usage docs

No new issues found.

The fix is production-proven and ready for merge.

@freemans13 freemans13 self-assigned this May 19, 2026
@freemans13 freemans13 requested a review from Copilot May 19, 2026 12:57

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a kafka producer-side latency regression introduced by the Sarama → franz-go migration by (1) lowering flush_frequency (franz-go ProducerLinger) for high-fanout topics and (2) decoupling the producer wrapper’s outer batcher “straggler flush” timer from flush_frequency via a new outer_batcher_linger URL param.

Changes:

  • Add OuterBatcherLinger (URL: outer_batcher_linger, default 10ms) and rewire the outer drain goroutine to use it instead of flush_frequency.
  • Update settings.conf to use flush_frequency=10ms for high-fanout topics (txmeta, legacyInv, validatortxsConfig.operator).
  • Add a perf-tagged end-to-end regression test for publish→consume latency and unit tests ensuring the new linger knob is plumbed and decoupled.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

File Description
util/kafka/linger_latency_regression_test.go Adds a perf-tagged regression test to demonstrate latency impact of flush_frequency (ProducerLinger) under high partition fanout.
util/kafka/kafka_producer_async.go Introduces OuterBatcherLinger and decouples outer batcher linger timing from FlushFrequency/ProducerLinger.
util/kafka/kafka_producer_async_test.go Extends URL parsing default-value test and adds coverage for outer_batcher_linger plumbing/decoupling.
settings.conf Lowers flush_frequency to 10ms on specific high-fanout topics to prevent per-record linger delays.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

// test isolates ProducerLinger. Unit-level coverage that the two fields are
// decoupled lives in TestNewKafkaAsyncProducerFromURLOuterBatcherLinger.
//
// Run: go test -v -run TestLingerLatencyRegression -timeout 5m ./util/kafka/

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — added -tags perf to the Run: example in the doc comment, matching the build tag and the PR description.

Addresses Copilot review on PR bsv-blockchain#894: the doc comment's `go test ...`
example was missing the `-tags perf` flag required to compile and
run the test, which is gated by `//go:build perf`.
@github-actions

github-actions Bot commented May 19, 2026

Copy link
Copy Markdown
Contributor

Benchmark Comparison Report

Baseline: main (unknown)

Current: PR-894 (4827802)

Summary

  • Regressions: 0
  • Improvements: 0
  • Unchanged: 144
  • Significance level: p < 0.05
All benchmark results (sec/op)
Benchmark Baseline Current Change p-value
_NewBlockFromBytes-4 1.783µ 1.754µ ~ 0.200
SplitSyncedParentMap_SetIfNotExists/256_buckets-4 61.57n 61.53n ~ 0.700
SplitSyncedParentMap_SetIfNotExists/16_buckets-4 61.43n 61.48n ~ 0.700
SplitSyncedParentMap_SetIfNotExists/1_bucket-4 61.37n 61.43n ~ 0.500
SplitSyncedParentMap_ConcurrentSetIfNotExists/256_buckets... 31.24n 31.78n ~ 0.400
SplitSyncedParentMap_ConcurrentSetIfNotExists/16_buckets_... 53.11n 51.15n ~ 0.700
SplitSyncedParentMap_ConcurrentSetIfNotExists/1_bucket_pa... 113.3n 106.3n ~ 0.500
MiningCandidate_Stringify_Short-4 259.7n 258.9n ~ 0.300
MiningCandidate_Stringify_Long-4 1.909µ 1.888µ ~ 0.100
MiningSolution_Stringify-4 965.8n 965.7n ~ 0.500
BlockInfo_MarshalJSON-4 1.770µ 1.773µ ~ 1.000
NewFromBytes-4 124.6n 133.3n ~ 0.200
AddTxBatchColumnar_Validation-4 2.729µ 2.540µ ~ 1.000
OffsetValidationLoop-4 544.5n 546.0n ~ 1.000
Mine_EasyDifficulty-4 61.25µ 61.00µ ~ 0.700
Mine_WithAddress-4 7.001µ 6.949µ ~ 0.400
BlockAssembler_AddTx-4 0.02731n 0.02807n ~ 1.000
AddNode-4 11.04 11.05 ~ 0.700
AddNodeWithMap-4 11.16 10.90 ~ 0.100
DirectSubtreeAdd/4_per_subtree-4 55.53n 61.61n ~ 0.100
DirectSubtreeAdd/64_per_subtree-4 28.86n 28.90n ~ 0.700
DirectSubtreeAdd/256_per_subtree-4 27.68n 27.96n ~ 0.700
DirectSubtreeAdd/1024_per_subtree-4 26.46n 26.60n ~ 0.600
DirectSubtreeAdd/2048_per_subtree-4 26.09n 26.14n ~ 0.400
SubtreeProcessorAdd/4_per_subtree-4 309.0n 303.0n ~ 1.000
SubtreeProcessorAdd/64_per_subtree-4 297.8n 295.5n ~ 0.700
SubtreeProcessorAdd/256_per_subtree-4 300.2n 297.4n ~ 0.700
SubtreeProcessorAdd/1024_per_subtree-4 291.2n 286.4n ~ 0.400
SubtreeProcessorAdd/2048_per_subtree-4 280.0n 291.2n ~ 0.700
SubtreeProcessorRotate/4_per_subtree-4 291.0n 301.1n ~ 0.200
SubtreeProcessorRotate/64_per_subtree-4 294.7n 284.4n ~ 0.700
SubtreeProcessorRotate/256_per_subtree-4 285.5n 284.3n ~ 0.700
SubtreeProcessorRotate/1024_per_subtree-4 286.8n 285.7n ~ 0.400
SubtreeNodeAddOnly/4_per_subtree-4 54.82n 54.86n ~ 1.000
SubtreeNodeAddOnly/64_per_subtree-4 35.97n 35.89n ~ 0.400
SubtreeNodeAddOnly/256_per_subtree-4 34.99n 34.99n ~ 0.800
SubtreeNodeAddOnly/1024_per_subtree-4 34.32n 34.43n ~ 0.800
SubtreeCreationOnly/4_per_subtree-4 109.5n 109.6n ~ 1.000
SubtreeCreationOnly/64_per_subtree-4 351.1n 347.1n ~ 0.400
SubtreeCreationOnly/256_per_subtree-4 1.232µ 1.221µ ~ 0.100
SubtreeCreationOnly/1024_per_subtree-4 3.813µ 3.789µ ~ 0.200
SubtreeCreationOnly/2048_per_subtree-4 6.873µ 6.872µ ~ 1.000
SubtreeProcessorOverheadBreakdown/64_per_subtree-4 293.8n 284.9n ~ 0.400
SubtreeProcessorOverheadBreakdown/1024_per_subtree-4 296.1n 281.3n ~ 0.200
ParallelGetAndSetIfNotExists/1k_nodes-4 2.042m 1.995m ~ 0.100
ParallelGetAndSetIfNotExists/10k_nodes-4 5.408m 5.278m ~ 0.400
ParallelGetAndSetIfNotExists/50k_nodes-4 8.272m 7.653m ~ 0.100
ParallelGetAndSetIfNotExists/100k_nodes-4 11.01m 10.24m ~ 0.100
SequentialGetAndSetIfNotExists/1k_nodes-4 1.809m 1.800m ~ 0.200
SequentialGetAndSetIfNotExists/10k_nodes-4 5.193m 4.702m ~ 0.100
SequentialGetAndSetIfNotExists/50k_nodes-4 15.36m 13.76m ~ 0.100
SequentialGetAndSetIfNotExists/100k_nodes-4 26.44m 25.23m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/1k_nodes-4 2.092m 2.079m ~ 0.400
ProcessOwnBlockSubtreeNodesParallel/10k_nodes-4 8.429m 8.416m ~ 0.700
ProcessOwnBlockSubtreeNodesParallel/100k_nodes-4 14.19m 13.82m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/1k_nodes-4 1.807m 1.795m ~ 0.400
ProcessOwnBlockSubtreeNodesSequential/10k_nodes-4 9.012m 8.251m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/100k_nodes-4 49.83m 46.59m ~ 0.400
DiskTxMap_SetIfNotExists-4 3.581µ 3.621µ ~ 0.700
DiskTxMap_SetIfNotExists_Parallel-4 3.354µ 3.351µ ~ 0.600
DiskTxMap_ExistenceOnly-4 312.9n 310.7n ~ 0.400
Queue-4 188.4n 185.5n ~ 0.100
AtomicPointer-4 3.657n 3.633n ~ 0.400
ReorgOptimizations/DedupFilterPipeline/Old/10K-4 831.2µ 828.3µ ~ 0.400
ReorgOptimizations/DedupFilterPipeline/New/10K-4 827.5µ 752.0µ ~ 0.100
ReorgOptimizations/AllMarkFalse/Old/10K-4 106.6µ 102.4µ ~ 0.100
ReorgOptimizations/AllMarkFalse/New/10K-4 64.62µ 64.23µ ~ 0.100
ReorgOptimizations/HashSlicePool/Old/10K-4 56.33µ 57.91µ ~ 0.400
ReorgOptimizations/HashSlicePool/New/10K-4 11.06µ 11.16µ ~ 1.000
ReorgOptimizations/NodeFlags/Old/10K-4 4.378µ 5.056µ ~ 0.100
ReorgOptimizations/NodeFlags/New/10K-4 1.519µ 1.971µ ~ 0.100
ReorgOptimizations/DedupFilterPipeline/Old/100K-4 9.132m 9.328m ~ 0.700
ReorgOptimizations/DedupFilterPipeline/New/100K-4 9.144m 9.140m ~ 0.700
ReorgOptimizations/AllMarkFalse/Old/100K-4 1.090m 1.062m ~ 0.200
ReorgOptimizations/AllMarkFalse/New/100K-4 704.7µ 704.8µ ~ 1.000
ReorgOptimizations/HashSlicePool/Old/100K-4 521.4µ 524.6µ ~ 0.100
ReorgOptimizations/HashSlicePool/New/100K-4 199.5µ 204.5µ ~ 0.700
ReorgOptimizations/NodeFlags/Old/100K-4 48.72µ 48.35µ ~ 0.400
ReorgOptimizations/NodeFlags/New/100K-4 16.26µ 16.95µ ~ 0.400
TxMapSetIfNotExists-4 49.13n 49.40n ~ 0.100
TxMapSetIfNotExistsDuplicate-4 41.42n 41.49n ~ 0.100
ChannelSendReceive-4 574.4n 604.4n ~ 0.100
CalcBlockWork-4 508.8n 505.8n ~ 0.100
CalculateWork-4 693.1n 699.5n ~ 0.500
BuildBlockLocatorString_Helpers/Size_10-4 1.364µ 1.353µ ~ 0.300
BuildBlockLocatorString_Helpers/Size_100-4 13.06µ 13.07µ ~ 0.700
BuildBlockLocatorString_Helpers/Size_1000-4 129.1µ 156.2µ ~ 0.400
CatchupWithHeaderCache-4 104.8m 104.5m ~ 0.700
_BufferPoolAllocation/16KB-4 3.973µ 4.102µ ~ 0.200
_BufferPoolAllocation/32KB-4 9.866µ 11.426µ ~ 0.700
_BufferPoolAllocation/64KB-4 22.81µ 17.76µ ~ 0.100
_BufferPoolAllocation/128KB-4 33.52µ 38.01µ ~ 0.100
_BufferPoolAllocation/512KB-4 112.9µ 141.8µ ~ 0.100
_BufferPoolConcurrent/32KB-4 19.76µ 23.80µ ~ 0.100
_BufferPoolConcurrent/64KB-4 31.88µ 33.24µ ~ 0.200
_BufferPoolConcurrent/512KB-4 151.0µ 154.7µ ~ 0.400
_SubtreeDeserializationWithBufferSizes/16KB-4 637.5µ 698.4µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/32KB-4 641.0µ 678.8µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/64KB-4 635.9µ 698.9µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/128KB-4 640.6µ 681.5µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/512KB-4 627.4µ 635.6µ ~ 0.400
_SubtreeDataDeserializationWithBufferSizes/16KB-4 38.25m 37.65m ~ 0.400
_SubtreeDataDeserializationWithBufferSizes/32KB-4 37.98m 37.19m ~ 0.200
_SubtreeDataDeserializationWithBufferSizes/64KB-4 37.28m 37.35m ~ 0.700
_SubtreeDataDeserializationWithBufferSizes/128KB-4 37.29m 37.53m ~ 0.700
_SubtreeDataDeserializationWithBufferSizes/512KB-4 37.32m 36.62m ~ 0.200
_PooledVsNonPooled/Pooled-4 739.5n 741.1n ~ 0.400
_PooledVsNonPooled/NonPooled-4 8.054µ 8.861µ ~ 0.700
_MemoryFootprint/Current_512KB_32concurrent-4 6.513µ 6.537µ ~ 0.100
_MemoryFootprint/Proposed_32KB_32concurrent-4 9.804µ 9.946µ ~ 0.700
_MemoryFootprint/Alternative_64KB_32concurrent-4 9.314µ 9.763µ ~ 0.200
SubtreeSizes/10k_tx_4_per_subtree-4 1.373m 1.406m ~ 0.400
SubtreeSizes/10k_tx_16_per_subtree-4 328.8µ 328.3µ ~ 0.700
SubtreeSizes/10k_tx_64_per_subtree-4 77.17µ 78.83µ ~ 0.100
SubtreeSizes/10k_tx_256_per_subtree-4 19.37µ 19.58µ ~ 0.700
SubtreeSizes/10k_tx_512_per_subtree-4 9.647µ 9.663µ ~ 0.700
SubtreeSizes/10k_tx_1024_per_subtree-4 4.770µ 4.816µ ~ 0.100
SubtreeSizes/10k_tx_2k_per_subtree-4 2.388µ 2.391µ ~ 0.300
BlockSizeScaling/10k_tx_64_per_subtree-4 75.77µ 76.88µ ~ 0.100
BlockSizeScaling/10k_tx_256_per_subtree-4 19.15µ 19.31µ ~ 0.100
BlockSizeScaling/10k_tx_1024_per_subtree-4 4.743µ 4.809µ ~ 0.200
BlockSizeScaling/50k_tx_64_per_subtree-4 396.7µ 401.5µ ~ 0.200
BlockSizeScaling/50k_tx_256_per_subtree-4 95.12µ 96.58µ ~ 0.100
BlockSizeScaling/50k_tx_1024_per_subtree-4 23.49µ 23.70µ ~ 0.700
SubtreeAllocations/small_subtrees_exists_check-4 162.3µ 161.5µ ~ 0.100
SubtreeAllocations/small_subtrees_data_fetch-4 167.2µ 168.0µ ~ 1.000
SubtreeAllocations/small_subtrees_full_validation-4 325.0µ 333.2µ ~ 0.200
SubtreeAllocations/medium_subtrees_exists_check-4 9.474µ 9.426µ ~ 0.700
SubtreeAllocations/medium_subtrees_data_fetch-4 9.871µ 9.921µ ~ 1.000
SubtreeAllocations/medium_subtrees_full_validation-4 19.21µ 19.26µ ~ 0.700
SubtreeAllocations/large_subtrees_exists_check-4 2.240µ 2.271µ ~ 0.600
SubtreeAllocations/large_subtrees_data_fetch-4 2.408µ 2.427µ ~ 0.100
SubtreeAllocations/large_subtrees_full_validation-4 4.764µ 4.819µ ~ 0.100
_prepareTxsPerLevel-4 305.9m 311.8m ~ 0.400
_prepareTxsPerLevelOrdered-4 2.841m 2.819m ~ 0.700
_prepareTxsPerLevel_Comparison/Original-4 308.4m 308.9m ~ 0.400
_prepareTxsPerLevel_Comparison/Optimized-4 2.957m 2.697m ~ 0.100
StoreBlock_Sequential/BelowCSVHeight-4 312.4µ 317.2µ ~ 0.700
StoreBlock_Sequential/AboveCSVHeight-4 317.0µ 312.4µ ~ 0.400
GetUtxoHashes-4 269.5n 271.4n ~ 0.400
GetUtxoHashes_ManyOutputs-4 47.17µ 46.00µ ~ 0.700
_NewMetaDataFromBytes-4 215.5n 215.5n ~ 1.000
_Bytes-4 404.7n 402.8n ~ 0.400
_MetaBytes-4 139.2n 140.1n ~ 0.400

Threshold: >10% with p < 0.05 | Generated: 2026-06-02 09:06 UTC

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.

Comment thread util/kafka/kafka_producer_async.go Outdated
Comment on lines +83 to +86
FlushBytes int // Flush threshold in bytes → kgo.ProducerBatchMaxBytes (see above)
FlushMessages int // Number of messages before flush → kgo.MaxBufferedRecords (see above)
FlushFrequency time.Duration // Time between flushes → kgo.ProducerLinger (see above)
OuterBatcherLinger time.Duration // Straggler-flush timer for the outer drain goroutine; defaults to defaultOuterBatcherLinger when zero/negative

@github-actions github-actions Bot May 21, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirming this documentation inconsistency. The code at line 540 (if len(buffered) >= maxBatch) uses FlushMessages (via currentBatchSize()) as a flush trigger for the outer batcher, contradicting the comment at line 63-64 that states it is "NOT a flush trigger."

The dual usage is:

  1. kgo.MaxBufferedRecords(cfg.FlushMessages) at line 280 — franz-go backpressure cap
  2. currentBatchSize() returning cfg.FlushMessages — outer batcher flush threshold at line 540

The documentation should clarify this dual purpose to avoid operator confusion when tuning flush_messages.

✅ RESOLVED - Documentation updated to correctly describe dual usage.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in the latest commit. The doc block on KafkaProducerConfig and the field-line comments now describe FlushMessages as dual-use (kgo.MaxBufferedRecords backpressure cap and outer-batcher flush-size trigger via currentBatchSize() at line 540), and FlushFrequency is described as ProducerLinger-only since the outer batcher now uses OuterBatcherLinger.

@oskarszoon oskarszoon left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approve. Clean RCA: identifies both the franz-go semantic shift (FlushFrequency = per-partition ProducerLinger, not "max time between flushes") and the second-order outer-batcher straggler-timer bug. Topic-scoped via URL params, high-fanout topics drop to 10ms, low-volume ones stay at 1s.

Nits:

  • Adaptive-slow range silently compressed 10× from [200ms, 5s] to [50ms, 500ms]. Mechanical consequence of the base change but worth a CHANGELOG note.
  • Outer-batcher decoupling only tested at config-parse level — the "two lingers stacked" regression that motivated this isn't directly covered.
  • External configmap overrides: terabuild, mainnet, testnet, teratestnet live in teranode-argocd-deployments — out of scope but worth surfacing for the rollout.

Copilot + github-actions both flagged that the field comments on
KafkaProducerConfig drifted from actual behaviour:

- FlushMessages is dual-use: kgo.MaxBufferedRecords AND outer-batcher
  flush-size trigger via currentBatchSize() at line 540 (not 'NOT a
  flush trigger' as the previous comment claimed).
- FlushFrequency now only drives kgo.ProducerLinger (per-partition
  broker linger); the outer batcher uses OuterBatcherLinger.

Updates the doc block and the field-line comments to describe both
effects accurately. No code change.
@freemans13 freemans13 changed the title Fix kafka producer linger so high-fanout topics don't pay ~1s of delay per record fix(kafka): stop producer waiting up to 1s per message on busy topics (p99 7s → 22ms) May 22, 2026
…ect stacked-lingers test

1. Document the 10× compression of the adaptive-slow linger bounds
   (was [200ms, 5s] when driven by FlushFrequency default 10s; now
   [50ms, 500ms] driven by OuterBatcherLinger default 10ms). Same
   shape, scaled with the new base — kept inline on currentBatchLinger
   since there is no CHANGELOG.

2. Add TestStackedLingerRegression to directly exercise the bug the
   decoupling was meant to fix. Holds franz-go ProducerLinger at 1s
   and varies only outer_batcher_linger (1s vs 10ms). Confirms the
   pre-fix stacked configuration adds the outer batcher's linger on
   top of franz-go's, and the post-fix default doesn't. Local run
   showed stacked p50=4.49s vs decoupled p50=519ms (≈9× gap).
freemans13 added a commit to freemans13/teranode that referenced this pull request May 26, 2026
@sonarqubecloud

sonarqubecloud Bot commented Jun 2, 2026

Copy link
Copy Markdown

@freemans13 freemans13 requested a review from ordishs June 2, 2026 18:21

@ordishs ordishs left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approve. Clean decoupling — verified FlushFrequency now feeds only kgo.ProducerLinger and currentBatchLinger() reads OuterBatcherLinger exclusively, with no residual coupling. New param is additive and non-breaking. Strong test coverage: the unit test guards the core invariant (flush_frequency does not influence outer linger) in fast tests, and the perf-tagged tests reproduce the pathology end-to-end with noise-tolerant thresholds.

A couple of non-blocking follow-ups:

  • Release-note the operator-facing semantic change: self-hosted configs that set flush_frequency expecting it to also throttle the outer batcher now get the 10ms outer default.
  • Confirm subtrees/invalidSubtrees are spared only because flush_messages=1 force-flushes per record regardless of the inherited 10s linger — a future flush_messages bump there would silently reintroduce it.

@freemans13 freemans13 merged commit 4a10224 into bsv-blockchain:main Jun 3, 2026
34 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants