fix(subtreevalidation): batch Kafka txmeta cache writes, bound goroutines#834
Conversation
…ines Production observation on dev-scale-2: subtree-validator's "Tx Meta read from Kafka /second" Grafana panel oscillates between 0 and 2.4M tx/s with mean ~1.5M, repeatedly dropping to zero. One month ago the same metric was a steady ~900K/s with no zero-drops. Cache hit rate sits at ~50% even for new (not-yet-mined) transactions, and subtree validation takes 30-49s per subtree (target: ~2s). Root cause is in services/subtreevalidation/txmetaHandler.go: - Each Kafka message spawns a goroutine via `go func()` (unbounded fan-out, since 9f4f1e5 in Dec 2025). - Each Kafka message carries a binary BATCH of N entries (since 4dcf264 in Dec 2025), but the handler loops calling SetCacheFromBytes once per entry sequentially. - Each SetCacheFromBytes call takes a per-bucket write lock in improved_cache.bucket.Set (improved_cache.go:799). Under burst load the cumulative effect is thousands of goroutines serializing through the cache's bucket locks. The cache's own sharded-bucket parallelism (8192 buckets in SetMulti) is wasted because each goroutine takes locks one at a time. Throughput collapses to single-writer speed, recovers when contention drains, collapses again — the visible 0/2.4M oscillation pattern. The franz Kafka library switch in #611 (2026-03-27) is the most likely trigger for the recent regression, but the underlying design flaw predates that. Fix: - New txmetaCacheJob struct: parsed Kafka batch (deep-copied keys/ values, separated ADD/DELETE). - Bounded worker pool consumes parsed jobs from a buffered channel. Workers call SetCacheMulti ONCE per batch — letting the cache's bucket fan-out parallelize internally, taking each touched bucket lock once per Kafka message instead of once per entry. - Handler now does cheap parsing on the Kafka consumer goroutine, then enqueues onto the work channel. Channel-full enqueue BLOCKS, applying backpressure to Kafka rather than letting goroutines pile up unboundedly. - Shutdown is driven by closing the work channel (not ctx cancel) so workers drain queued items before exiting. sync.Once on the close keeps Stop() idempotent for tests with deferred cleanup. Settings: - subtreevalidation_txmetaCacheKafkaWorkers (default 8) - subtreevalidation_txmetaCacheKafkaQueueSize (default 256) Interface: - SetCacheMulti added to txMetaCacheOps. TxMetaCache already has it. Tests: - Existing TestServer_txmetaHandler covers nil/short/ADD/DELETE paths; updated to assert SetCacheMulti (not SetCacheFromBytes) on ADD and run against the worker pool. - New TestServer_txmetaHandler_BatchesIntoSingleSetCacheMulti guards the regression: 50 entries -> ONE SetCacheMulti call. - New TestParseTxmetaBatch covers empty/zero-entry/multi-entry/ truncated batches and verifies keys/values are deep-copied so later mutation of the source buffer doesn't corrupt parsed jobs. - Race detector clean. Note: this fix does NOT remove the cache writeback in TxMetaCache.BatchDecorate (cache-aside on store fetches) — that path is needed for nodes whose Kafka publishers don't carry every transaction. Verified: go build, go vet, go test -race for the changed packages all green; pre-commit hooks pass.
|
🤖 Claude Code Review Status: Complete Critical Issues Found[Critical] Documentation-Implementation Mismatch: Worker Pool and SetCacheMulti Not ImplementedPR Description Claims:
Actual Implementation (txmetaHandler.go:64-124):
Documented But Unused Settings (settings/subtreevalidation_settings.go:21-22):
Added But Unused Interface Method (SubtreeValidation.go:140-144):
Impact: Operators cannot tune performance using documented settings. Documentation builds false expectations about how the system works. Evidence:
Findings Summary
Additional NotesInline Comments: All 6 previous inline comments reference code from earlier commits (parseTxmetaBatch, applyTxmetaCacheJob, worker pool) that was removed in commit 0595ec9. They are now obsolete and can be resolved. Code Quality: The actual implementation (inline synchronous processing with per-entry cache writes) is simple and correct. The mismatch is purely in documentation vs. implementation. |
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-05-12 14:09 UTC |
Restores the partition-level consume parallelism that the franz-go switch (#611, 2026-03-27) silently removed, and fixes the producer-side sticky-partitioner skew that produces bursty per-partition load. Three changes, all on the txmeta hot path: 1) services/validator/Validator.go — Kafka producer now sets Key to a tx hash (the first hash in the batch for batched sends, the txn's own hash for single sends). Previously Key was nil, which under franz-go's default StickyKeyPartitioner is equivalent to a StickyPartitioner: every batch lands on the same partition until the linger/batch threshold trips, producing the bursty oscillation we observed in production. With a non-nil key, StickyKeyPartitioner hashes the key onto a partition deterministically. tx hashes are uniformly distributed, so partition usage is now uniform. 2) util/kafka/kafka_consumer.go (Start) — replaces the single-goroutine fetches.EachRecord(...) loop with fetches.EachPartition(...) + one goroutine per partition per fetch. Within a partition records are still processed sequentially so per-partition order is preserved; across partitions the work runs in parallel. partitionWg is awaited before the next PollFetches so in-flight goroutine count is bounded by partition count. This restores N-way concurrency that the previous Kafka library provided implicitly via per-partition consumer goroutines. 3) util/kafka/kafka_consumer.go (NewKafkaConsumerGroup) — when AutoCommitEnabled is true, registers kgo.AutoCommitMarks() and the per-partition loop calls client.MarkCommitRecords(record) only after consumerFn returns nil. With the previous default auto-commit any record that consumerFn errored on was still being committed by the auto-commit timer because franz-go tracks "iterated" not "succeeded". This change makes auto-commit consistent with the manual-commit path's semantics. Verified: - go build / go vet across changed packages — clean - go test -short -race ./util/kafka/... — pass (perf tests are -short-skipped; they require a real broker) - go test -race ./services/subtreevalidation/{TestServer_txmetaHandler, TestParseTxmetaBatch} — pass (worker pool from #834 still works under the new parallel feeder) - Pre-commit hooks (gofmt / gci / golangci-lint) — pass Notes: - Producer compression intentionally NOT enabled — Bitcoin tx data is high-entropy, compression would just burn CPU. - AutoCommitMarks closes the gap where errored records were still committed; it does NOT close the gap where async handlers (txmeta worker pool from #834) return before the cache write completes. Per discussion this is acceptable: cache misses fall through to the UTXO store transparently.
CI on PR #828 caught three e2e tests panicking with "send on closed channel" originating from txmetaHandler.go:148: legacy-sync TestBIP68_TimeBased_Accept smoketest TestBIP68_HeightBased_Accept prunertest TestPrunerUnminedParentRetention All three use the in-memory Kafka consumer which delivers a final message AFTER Server.Stop() has returned from txmetaConsumerClient. Close(). The handler running on that delivery race-loses with stopTxmetaCacheWorkers's close(txmetaCacheJobCh) and panics. Fix: a sync.RWMutex (txmetaCacheCloseMu) plus a closed flag (txmetaCacheClosed) coordinate the two paths: - Senders (txmetaHandler) take the read lock, check closed, send. - Closer (stopTxmetaCacheWorkers) takes the write lock, flips the flag, closes the channel — all under the write lock. Read-lock-then-check-then-send is atomic from the closer's POV: a sender either sees closed=false and completes its send before the write lock is granted, or sees closed=true and bails. No path can land on close(ch) followed by send(ch). Read locks are uncontended on the steady-state hot path and the close runs once per Server lifetime, so overhead is negligible. Test: TestServer_txmetaHandler_ShutdownRace stresses 16 sender goroutines against a concurrent stop(); 10× -race iterations clean. Verified: - go build / go vet — clean - go test -count=10 -race -run "TestServer_txmetaHandler| TestParseTxmetaBatch" ./services/subtreevalidation/ — pass - Pre-commit hooks — pass
…er pool The bounded worker pool added in #834 unblocked the consumer but capped throughput at the worker count and bound the apply rate to the slowest worker on every batch. Production never recovered the historical 2M+ ops/sec on this path. Two changes: 1. Drop the worker pool. txmetaHandler now spawns one goroutine per parsed Kafka message (fire-and-forget). Cache fill races subtree arrival; every ms of queueing matters because a stale cache forces fall-through to the UTXO store, which is far more expensive. 2. Apply per entry, not per batch. SetCacheMulti's bucket fan-out takes each touched bucket's write lock for the duration of writing ALL keys mapped to that bucket (~1 ms hold for 1024-key batches). Under many concurrent writers, contenders queue up behind that hold and end-of-queue wait inflates with concurrency. Per-entry SetCacheFromBytes acquires the bucket lock once per key (~1 µs holds). Aggregate work is the same; the lock-contention queue is much shallower, which is what historically sustained 2M+ ops/sec. Tests updated: - successful set test asserts SetCacheFromBytes (not SetCacheMulti) - new TestServer_txmetaHandler_PerEntrySetCacheFromBytes guards the per-entry strategy and explicitly AssertNotCalled on SetCacheMulti - TestServer_txmetaHandler_ShutdownRace renamed to ConcurrentCalls; the close-vs-send race it covered no longer exists (no channel)
…er pool The bounded worker pool added in #834 unblocked the consumer but capped throughput at the worker count and bound the apply rate to the slowest worker on every batch. Production never recovered the historical 2M+ ops/sec on this path. Two changes: 1. Drop the worker pool. txmetaHandler now spawns one goroutine per parsed Kafka message (fire-and-forget). Cache fill races subtree arrival; every ms of queueing matters because a stale cache forces fall-through to the UTXO store, which is far more expensive. 2. Apply per entry, not per batch. SetCacheMulti's bucket fan-out takes each touched bucket's write lock for the duration of writing ALL keys mapped to that bucket (~1 ms hold for 1024-key batches). Under many concurrent writers, contenders queue up behind that hold and end-of-queue wait inflates with concurrency. Per-entry SetCacheFromBytes acquires the bucket lock once per key (~1 µs holds). Aggregate work is the same; the lock-contention queue is much shallower, which is what historically sustained 2M+ ops/sec. Tests updated: - successful set test asserts SetCacheFromBytes (not SetCacheMulti) - new TestServer_txmetaHandler_PerEntrySetCacheFromBytes guards the per-entry strategy and explicitly AssertNotCalled on SetCacheMulti - TestServer_txmetaHandler_ShutdownRace renamed to ConcurrentCalls; the close-vs-send race it covered no longer exists (no channel)
Two-tier execution in the txmeta Kafka consumer path:
ADDs - each entry in a parsed Kafka batch spawns its own goroutine
that does ONE SetCacheFromBytes and exits. No per-batch
sequential walk, no waitgroup, no bound. Best-effort: a
missed write falls through to the UTXO store on the next
BatchDecorate, so we optimise for latency-to-cache, not for
delivery guarantees. Spawning per-entry maximises bucket
shard parallelism - each writer holds the bucket lock ~1 us,
and across 8192 buckets the contention queue stays shallow
even at multi-million ops/sec.
DELETE - runs synchronously in the Kafka consumer goroutine. Delete
must complete before the offset commits, otherwise stale
metadata can survive a tx being conflicted/replaced and
future BatchDecorate calls will return wrong results. On
failure we surface the error so the consumer leaves the
record uncommitted; on rebalance/restart the message is
re-delivered and the delete retried.
Tests updated to assert the synchronous-DELETE error propagation.
…iate goroutine
txmetaHandler runs inside the Kafka consumer's per-partition goroutine
already, so the previous design — parse into a job struct → spawn an
applyTxmetaCacheJob goroutine → that goroutine spawns N entry goroutines —
added one goroutine layer with no actual work, plus a job struct alloc
and 2N per-entry mallocs (key + value copies).
This commit flattens the call chain:
- parseTxmetaBatch and applyTxmetaCacheJob removed.
- txmetaHandler parses entries inline.
- DELETEs run synchronously, return error on failure (offset stays
uncommitted, message re-delivered on rebalance/restart).
- ADDs each spawn one fire-and-forget goroutine that does a single
SetCacheFromBytes and exits.
Memory: keys and values are SUBSLICES of msg.Value, not heap copies.
franz-go's Record.Value is a stable per-record buffer (not pooled or
reused), so the apply goroutines hold subslices and Go's GC keeps the
underlying buffer alive until they finish. The only per-entry copy is
the 32-byte chainhash.Hash for DELETEs (forced by the [32]byte array
type, value copy, no allocation).
Allocations per Kafka message: 0 per entry + N goroutine stacks for the
ADD apply work. Down from 1 (job struct) + 2 (slice headers) + 2N
(per-entry key/value) for a 1024-entry batch.
Tests: TestParseTxmetaBatch removed (parser no longer exists).
Replaced with TestServer_txmetaHandler_MixedBatch (ADD+DELETE+ADD →
2x SetCacheFromBytes, 1x Delete) and
TestServer_txmetaHandler_TruncatedBatch (malformed ack, no panic).
…y goroutine Per-entry ADD goroutines saturated the Go scheduler at scale. Production metrics on dev-scale-1-scale-2 (256-partition txmeta topic, ~1.1M entries/sec): scheduler queue ~89K deep, p50 enqueue->SetCacheFromBytes latency ~70ms, p99 ~256ms — versus the actual cache write taking microseconds. Spawn cost (~2us) + queue wait (80ms) dominated; the per-entry parallelism we were paying for was queue depth, not real concurrency. Walk the 1024 entries serially inside the partition goroutine instead. Bucket-shard parallelism is preserved across the 256 partition goroutines (each handling its own records); one goroutine per partition, one cache-write at a time within. Per-record runtime: ~5ms (1024 x ~5us). Goroutines spawned per second drops from ~1.1M to ~1075. Aggregate throughput is the same. The visible chart oscillation should flatten because completions are now evenly spaced, and the latency histogram should drop to single-digit ms.
7af5bc0 to
60432ce
Compare
|
oskarszoon
left a comment
There was a problem hiding this comment.
Approve. Good prod-driven iteration — the rewrite from worker-pool+channels to inline synchronous-DELETE + fire-and-forget-ADD is cleaner and addresses both of gokutheengineer's findings (ADD/DELETE reordering, shutdown race). Per-partition parallelism still comes from the 256 partition goroutines themselves, with two orders of magnitude fewer total goroutines than the pre-PR design. Good doc comments on the why-each-choice.
One ask: SetCacheMulti was added to the cache interface in the first design pass, but the final rewrite uses per-entry SetCacheFromBytes synchronously — is SetCacheMulti still needed for a planned use case, or vestigial after the redesign? If vestigial, worth removing to keep the surface clean; if planned, a one-line comment at the interface declaration would prevent it from being culled by a future reader.


Summary
Fixes subtree-validator's Kafka txmeta cache-write throughput regression that surfaces in production as wildly oscillating "Tx Meta read from Kafka /second" (0 → 2.4M → 0, mean ~1.5M) where one month ago the same metric was a steady ~900K/s.
Knock-on effect: txMetaCache hit rate is stuck at ~50% even for not-yet-mined transactions, and subtree validation on
dev-scale-2takes 30–49s per subtree (target ~2s).Production evidence
The 0-throughput periods correlate with goroutines piled up on bucket locks in
improved_cache.bucket.Set.Root cause
In
services/subtreevalidation/txmetaHandler.go:Three accumulated decisions made this pathological:
9f4f1e5(2025-12-18) — wrapped the handler ingo func()to "benefit from sharded buckets". Spawned one goroutine per Kafka message with no concurrency cap.4dcf264(2025-12-19) — switched from one-tx-per-message proto to N-tx-per-message binary batches. Inner loop kept callingSetCacheFromBytes(singular) once per entry.#611franz Kafka library switch (2026-03-27) — most likely trigger for the recent regression (one month ago vs now). The library switch likely changed batch sizes / consumer pacing in a way that exposes the latent contention.Under load: thousands of goroutines, each doing N sequential singular cache writes through 8192 per-bucket write locks. The cache's own sharded-bucket parallelism in
SetMulti(which fans out across buckets via internalerrgroup.Go) is wasted because each goroutine takes one lock at a time.Fix
Two-part:
1. Batch the cache writes — one
SetCacheMultiper Kafka message.A single
SetCacheMulticall lets the cache fan out internally across buckets, taking each touched bucket lock once per Kafka message instead of once per entry. The number of bucket-lock acquisitions per Kafka message drops fromN entriestomin(N, ~unique_buckets_touched).2. Bounded worker pool replaces unbounded
go func().A small fixed pool reads parsed batches from a buffered channel. When the channel is full, the handler's enqueue blocks — propagating backpressure to the Kafka consumer rather than letting goroutines pile up.
Shutdown is driven by closing the channel (not by
ctx.Done()) — workers dofor job := range chand naturally drain remaining items before exiting. Async.OncekeepsStop()idempotent.What this PR does not change
Per discussion: the cache writeback in
TxMetaCache.BatchDecorate(cache-aside on store fetches) is preserved. Nodes whose Kafka publishers don't carry every transaction depend on it to populate the cache during subtree-validation store-fetches.Settings
subtreevalidation_txmetaCacheKafkaWorkers8subtreevalidation_txmetaCacheKafkaQueueSize256Tunables, not hot-path.
8 * SetCacheMultiper worker × cache's internal bucket fan-out should sustain millions of ops/sec; raise only if profiles show workers idle while Kafka lag grows.Tests (all pass with
-race)TestServer_txmetaHandler(existing, updated): nil/short/ADD/DELETE paths now exercise the worker pool. ADD path assertsSetCacheMulti(notSetCacheFromBytes) is called.TestServer_txmetaHandler_BatchesIntoSingleSetCacheMulti(new): regression guard — 50 entries in one Kafka message must produce oneSetCacheMulticall carrying all 50 keys, and zeroSetCacheFromBytescalls.TestParseTxmetaBatch(new): empty / zero-entry / multi-entry-mix / truncated / buffer-reuse-safety (verifies parsed keys & values are deep-copied so later mutation of the sourcemsg.Valuebuffer can't corrupt jobs the worker hasn't processed yet).Test plan
go build ./...cleango vet ./services/subtreevalidation/... ./settings/... ./stores/txmetacache/...cleango test -race -count=1for changed package — passdev-scale-2, observe:Risk / rollback
Related / follow-up
aerospike-client-go/v8patch to remove thenodeStatsRWMutex.Lockper result-code update would complement this on the read-side fan-out path.