Skip to content

fix(subtreevalidation): batch Kafka txmeta cache writes, bound goroutines#834

Merged
icellan merged 6 commits into
feat/teranode-native-opsfrom
fix/subtree-validator-kafka-cache-throughput
May 15, 2026
Merged

fix(subtreevalidation): batch Kafka txmeta cache writes, bound goroutines#834
icellan merged 6 commits into
feat/teranode-native-opsfrom
fix/subtree-validator-kafka-cache-throughput

Conversation

@icellan

@icellan icellan commented May 8, 2026

Copy link
Copy Markdown
Contributor

Summary

Fixes subtree-validator's Kafka txmeta cache-write throughput regression that surfaces in production as wildly oscillating "Tx Meta read from Kafka /second" (0 → 2.4M → 0, mean ~1.5M) where one month ago the same metric was a steady ~900K/s.

Knock-on effect: txMetaCache hit rate is stuck at ~50% even for not-yet-mined transactions, and subtree validation on dev-scale-2 takes 30–49s per subtree (target ~2s).

Production evidence

Metric One month ago (good) Now (bad)
Tx Meta read from Kafka /sec Steady ~900K, mean 915K 0 → 2.4M oscillating, mean ~1.5M
Cache hit rate (presumed near-100%) 50.9%
Subtree validation duration (presumed ~2s) 30–49s
Live goroutines on subtree-validator low 11,838+

The 0-throughput periods correlate with goroutines piled up on bucket locks in improved_cache.bucket.Set.

Root cause

In services/subtreevalidation/txmetaHandler.go:

go func() {
    // ... parse N-entry batch ...
    for i := uint32(0); i < entryCount; i++ {
        ...
        u.SetTxMetaCacheFromBytes(ctx, hash[:], content)  // singular Set per entry
    }
}()

Three accumulated decisions made this pathological:

  1. 9f4f1e5 (2025-12-18) — wrapped the handler in go func() to "benefit from sharded buckets". Spawned one goroutine per Kafka message with no concurrency cap.
  2. 4dcf264 (2025-12-19) — switched from one-tx-per-message proto to N-tx-per-message binary batches. Inner loop kept calling SetCacheFromBytes (singular) once per entry.
  3. #611 franz Kafka library switch (2026-03-27) — most likely trigger for the recent regression (one month ago vs now). The library switch likely changed batch sizes / consumer pacing in a way that exposes the latent contention.

Under load: thousands of goroutines, each doing N sequential singular cache writes through 8192 per-bucket write locks. The cache's own sharded-bucket parallelism in SetMulti (which fans out across buckets via internal errgroup.Go) is wasted because each goroutine takes one lock at a time.

Fix

Two-part:

1. Batch the cache writes — one SetCacheMulti per Kafka message.

A single SetCacheMulti call lets the cache fan out internally across buckets, taking each touched bucket lock once per Kafka message instead of once per entry. The number of bucket-lock acquisitions per Kafka message drops from N entries to min(N, ~unique_buckets_touched).

2. Bounded worker pool replaces unbounded go func().

A small fixed pool reads parsed batches from a buffered channel. When the channel is full, the handler's enqueue blocks — propagating backpressure to the Kafka consumer rather than letting goroutines pile up.

[Kafka consumer goroutine]              [Worker pool (default 8 workers)]
       |                                          |
   parse batch  →  enqueue  →  [chan: 256 deep]  →  applyTxmetaCacheJob
       (cheap, cancellable)        (blocks if full)        |
                                                           v
                                                      cache.SetCacheMulti  (one call, fans across buckets)
                                                      cache.Delete         (one per DELETE entry)

Shutdown is driven by closing the channel (not by ctx.Done()) — workers do for job := range ch and naturally drain remaining items before exiting. A sync.Once keeps Stop() idempotent.

What this PR does not change

Per discussion: the cache writeback in TxMetaCache.BatchDecorate (cache-aside on store fetches) is preserved. Nodes whose Kafka publishers don't carry every transaction depend on it to populate the cache during subtree-validation store-fetches.

Settings

Key Default Notes
subtreevalidation_txmetaCacheKafkaWorkers 8 Bounded concurrency for cache writes from the txmeta Kafka topic
subtreevalidation_txmetaCacheKafkaQueueSize 256 Buffered channel depth between Kafka handler and worker pool

Tunables, not hot-path. 8 * SetCacheMulti per worker × cache's internal bucket fan-out should sustain millions of ops/sec; raise only if profiles show workers idle while Kafka lag grows.

Tests (all pass with -race)

  • TestServer_txmetaHandler (existing, updated): nil/short/ADD/DELETE paths now exercise the worker pool. ADD path asserts SetCacheMulti (not SetCacheFromBytes) is called.
  • TestServer_txmetaHandler_BatchesIntoSingleSetCacheMulti (new): regression guard — 50 entries in one Kafka message must produce one SetCacheMulti call carrying all 50 keys, and zero SetCacheFromBytes calls.
  • TestParseTxmetaBatch (new): empty / zero-entry / multi-entry-mix / truncated / buffer-reuse-safety (verifies parsed keys & values are deep-copied so later mutation of the source msg.Value buffer can't corrupt jobs the worker hasn't processed yet).

Test plan

  • go build ./... clean
  • go vet ./services/subtreevalidation/... ./settings/... ./stores/txmetacache/... clean
  • go test -race -count=1 for changed package — pass
  • Pre-commit hooks (gofmt / gci / golangci-lint / yaml / markdown) — pass
  • CI run on this PR
  • Deploy to dev-scale-2, observe:
    • "Tx Meta read from Kafka /second" returns to a steady line (no 0-drops)
    • cache hit rate climbs toward ~100%
    • subtree validation duration drops toward 2s

Risk / rollback

  • Pure subtree-validator-side change. Producer (Kafka publisher) and message format are unchanged.
  • Worker count and queue size are config-only knobs; can be tuned without redeploy if the defaults are wrong for the cluster shape.
  • Rollback = revert one commit.

Related / follow-up

  • The franz Kafka library switch (feature: add franz as Kafka library #611) deserves an independent investigation — it's the strongest candidate for the throughput regression timing, and other consumers may have similar latent issues.
  • An eventual aerospike-client-go/v8 patch to remove the nodeStats RWMutex.Lock per result-code update would complement this on the read-side fan-out path.

…ines

Production observation on dev-scale-2: subtree-validator's "Tx Meta read
from Kafka /second" Grafana panel oscillates between 0 and 2.4M tx/s
with mean ~1.5M, repeatedly dropping to zero. One month ago the same
metric was a steady ~900K/s with no zero-drops. Cache hit rate sits
at ~50% even for new (not-yet-mined) transactions, and subtree
validation takes 30-49s per subtree (target: ~2s).

Root cause is in services/subtreevalidation/txmetaHandler.go:

- Each Kafka message spawns a goroutine via `go func()` (unbounded
  fan-out, since 9f4f1e5 in Dec 2025).
- Each Kafka message carries a binary BATCH of N entries (since
  4dcf264 in Dec 2025), but the handler loops calling
  SetCacheFromBytes once per entry sequentially.
- Each SetCacheFromBytes call takes a per-bucket write lock in
  improved_cache.bucket.Set (improved_cache.go:799).

Under burst load the cumulative effect is thousands of goroutines
serializing through the cache's bucket locks. The cache's own
sharded-bucket parallelism (8192 buckets in SetMulti) is wasted
because each goroutine takes locks one at a time. Throughput
collapses to single-writer speed, recovers when contention drains,
collapses again — the visible 0/2.4M oscillation pattern.

The franz Kafka library switch in #611 (2026-03-27) is the most
likely trigger for the recent regression, but the underlying design
flaw predates that.

Fix:

- New txmetaCacheJob struct: parsed Kafka batch (deep-copied keys/
  values, separated ADD/DELETE).
- Bounded worker pool consumes parsed jobs from a buffered channel.
  Workers call SetCacheMulti ONCE per batch — letting the cache's
  bucket fan-out parallelize internally, taking each touched bucket
  lock once per Kafka message instead of once per entry.
- Handler now does cheap parsing on the Kafka consumer goroutine,
  then enqueues onto the work channel. Channel-full enqueue BLOCKS,
  applying backpressure to Kafka rather than letting goroutines
  pile up unboundedly.
- Shutdown is driven by closing the work channel (not ctx cancel)
  so workers drain queued items before exiting. sync.Once on the
  close keeps Stop() idempotent for tests with deferred cleanup.

Settings:
- subtreevalidation_txmetaCacheKafkaWorkers (default 8)
- subtreevalidation_txmetaCacheKafkaQueueSize (default 256)

Interface:
- SetCacheMulti added to txMetaCacheOps. TxMetaCache already has it.

Tests:
- Existing TestServer_txmetaHandler covers nil/short/ADD/DELETE
  paths; updated to assert SetCacheMulti (not SetCacheFromBytes)
  on ADD and run against the worker pool.
- New TestServer_txmetaHandler_BatchesIntoSingleSetCacheMulti
  guards the regression: 50 entries -> ONE SetCacheMulti call.
- New TestParseTxmetaBatch covers empty/zero-entry/multi-entry/
  truncated batches and verifies keys/values are deep-copied so
  later mutation of the source buffer doesn't corrupt parsed jobs.
- Race detector clean.

Note: this fix does NOT remove the cache writeback in
TxMetaCache.BatchDecorate (cache-aside on store fetches) — that
path is needed for nodes whose Kafka publishers don't carry every
transaction.

Verified: go build, go vet, go test -race for the changed packages
all green; pre-commit hooks pass.
@github-actions

github-actions Bot commented May 8, 2026

Copy link
Copy Markdown
Contributor

🤖 Claude Code Review

Status: Complete


Critical Issues Found

[Critical] Documentation-Implementation Mismatch: Worker Pool and SetCacheMulti Not Implemented

PR Description Claims:

1. Batch the cache writes — one SetCacheMulti per Kafka message.

2. Bounded worker pool replaces unbounded go func().

Actual Implementation (txmetaHandler.go:64-124):

  • Processes entries synchronously inline in the Kafka consumer goroutine (no worker pool, no channels)
  • Calls SetCacheFromBytes per entry (line 111), not SetCacheMulti per message
  • Test at line 389-416 explicitly verifies SetCacheMulti is never called

Documented But Unused Settings (settings/subtreevalidation_settings.go:21-22):

  • TxmetaCacheKafkaWorkers (default 8) - claims to control worker pool that does not exist
  • TxmetaCacheKafkaQueueSize (default 256) - references buffered channel that does not exist
  • Setting docs describe "Workers pull messages from a buffered channel, parse the batch, and emit a single SetCacheMulti call per message" — contradicts actual code

Added But Unused Interface Method (SubtreeValidation.go:140-144):

  • SetCacheMulti(keys [][]byte, values [][]byte) added to txMetaCacheOps interface with detailed godoc about batch performance benefits — method is never invoked

Impact: Operators cannot tune performance using documented settings. Documentation builds false expectations about how the system works.

Evidence:

  • txmetaHandler.go:103-115 (per-entry loop calling SetCacheFromBytes)
  • txmetaHandler_test.go:389-416 (test asserting SetCacheMulti is NOT called)
  • settings/subtreevalidation_settings.go:21-22 (settings referencing non-existent worker pool)

Findings Summary

  • 1 Critical: Documentation describes architecture (worker pool + SetCacheMulti batching) that is not implemented
  • 0 Major issues
  • 0 Minor issues

Additional Notes

Inline Comments: All 6 previous inline comments reference code from earlier commits (parseTxmetaBatch, applyTxmetaCacheJob, worker pool) that was removed in commit 0595ec9. They are now obsolete and can be resolved.

Code Quality: The actual implementation (inline synchronous processing with per-entry cache writes) is simple and correct. The mismatch is purely in documentation vs. implementation.

@icellan icellan requested review from freemans13 and gokutheengineer and removed request for gokutheengineer May 8, 2026 12:23
@github-actions

github-actions Bot commented May 8, 2026

Copy link
Copy Markdown
Contributor

Benchmark Comparison Report

Baseline: main (unknown)

Current: PR-834 (5979ce7)

Summary

  • Regressions: 0
  • Improvements: 0
  • Unchanged: 142
  • Significance level: p < 0.05
All benchmark results (sec/op)
Benchmark Baseline Current Change p-value
_NewBlockFromBytes-4 1.696µ 1.668µ ~ 0.100
SplitSyncedParentMap_SetIfNotExists/256_buckets-4 61.68n 61.78n ~ 0.300
SplitSyncedParentMap_SetIfNotExists/16_buckets-4 61.81n 62.10n ~ 0.700
SplitSyncedParentMap_SetIfNotExists/1_bucket-4 61.75n 61.81n ~ 0.700
SplitSyncedParentMap_ConcurrentSetIfNotExists/256_buckets... 30.13n 30.23n ~ 1.000
SplitSyncedParentMap_ConcurrentSetIfNotExists/16_buckets_... 53.00n 52.47n ~ 0.700
SplitSyncedParentMap_ConcurrentSetIfNotExists/1_bucket_pa... 107.4n 113.4n ~ 0.100
MiningCandidate_Stringify_Short-4 267.1n 272.5n ~ 0.100
MiningCandidate_Stringify_Long-4 1.900µ 1.889µ ~ 0.100
MiningSolution_Stringify-4 993.8n 958.8n ~ 0.100
BlockInfo_MarshalJSON-4 1.791µ 1.763µ ~ 0.100
NewFromBytes-4 140.3n 128.6n ~ 0.100
Mine_EasyDifficulty-4 65.25µ 65.56µ ~ 0.400
Mine_WithAddress-4 6.938µ 6.888µ ~ 0.400
BlockAssembler_AddTx-4 0.02791n 0.02550n ~ 0.700
AddNode-4 10.38 10.94 ~ 0.200
AddNodeWithMap-4 10.69 10.91 ~ 1.000
DirectSubtreeAdd/4_per_subtree-4 67.20n 63.39n ~ 0.100
DirectSubtreeAdd/64_per_subtree-4 29.40n 28.45n ~ 0.100
DirectSubtreeAdd/256_per_subtree-4 27.90n 27.67n ~ 0.100
DirectSubtreeAdd/1024_per_subtree-4 26.92n 26.34n ~ 0.100
DirectSubtreeAdd/2048_per_subtree-4 26.58n 26.01n ~ 0.100
SubtreeProcessorAdd/4_per_subtree-4 288.8n 284.9n ~ 0.200
SubtreeProcessorAdd/64_per_subtree-4 285.5n 277.2n ~ 0.100
SubtreeProcessorAdd/256_per_subtree-4 290.4n 275.3n ~ 0.100
SubtreeProcessorAdd/1024_per_subtree-4 276.7n 268.4n ~ 0.100
SubtreeProcessorAdd/2048_per_subtree-4 275.6n 271.6n ~ 0.100
SubtreeProcessorRotate/4_per_subtree-4 280.2n 275.1n ~ 0.100
SubtreeProcessorRotate/64_per_subtree-4 277.0n 273.8n ~ 0.100
SubtreeProcessorRotate/256_per_subtree-4 278.7n 271.0n ~ 0.100
SubtreeProcessorRotate/1024_per_subtree-4 272.8n 270.6n ~ 0.100
SubtreeNodeAddOnly/4_per_subtree-4 55.54n 54.28n ~ 0.100
SubtreeNodeAddOnly/64_per_subtree-4 35.20n 34.42n ~ 0.100
SubtreeNodeAddOnly/256_per_subtree-4 34.01n 33.44n ~ 0.100
SubtreeNodeAddOnly/1024_per_subtree-4 33.22n 32.71n ~ 0.100
SubtreeCreationOnly/4_per_subtree-4 116.4n 113.5n ~ 0.100
SubtreeCreationOnly/64_per_subtree-4 440.1n 413.6n ~ 0.100
SubtreeCreationOnly/256_per_subtree-4 1.437µ 1.409µ ~ 0.200
SubtreeCreationOnly/1024_per_subtree-4 4.664µ 4.475µ ~ 0.100
SubtreeCreationOnly/2048_per_subtree-4 8.812µ 8.209µ ~ 0.100
SubtreeProcessorOverheadBreakdown/64_per_subtree-4 275.3n 271.7n ~ 0.100
SubtreeProcessorOverheadBreakdown/1024_per_subtree-4 274.0n 274.3n ~ 1.000
ParallelGetAndSetIfNotExists/1k_nodes-4 597.6µ 602.9µ ~ 0.400
ParallelGetAndSetIfNotExists/10k_nodes-4 1.353m 1.312m ~ 0.100
ParallelGetAndSetIfNotExists/50k_nodes-4 6.790m 6.551m ~ 0.100
ParallelGetAndSetIfNotExists/100k_nodes-4 13.64m 13.39m ~ 0.100
SequentialGetAndSetIfNotExists/1k_nodes-4 673.0µ 662.4µ ~ 0.200
SequentialGetAndSetIfNotExists/10k_nodes-4 2.913m 2.777m ~ 0.100
SequentialGetAndSetIfNotExists/50k_nodes-4 11.17m 10.46m ~ 0.100
SequentialGetAndSetIfNotExists/100k_nodes-4 22.61m 20.22m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/1k_nodes-4 663.8µ 640.2µ ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/10k_nodes-4 4.431m 4.286m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/100k_nodes-4 17.52m 16.50m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/1k_nodes-4 721.7µ 700.5µ ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/10k_nodes-4 6.574m 5.804m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/100k_nodes-4 44.01m 37.91m ~ 0.100
DiskTxMap_SetIfNotExists-4 3.594µ 3.687µ ~ 0.700
DiskTxMap_SetIfNotExists_Parallel-4 3.459µ 3.633µ ~ 0.400
DiskTxMap_ExistenceOnly-4 320.3n 333.0n ~ 0.200
Queue-4 153.7n 150.7n ~ 0.100
AtomicPointer-4 2.524n 2.512n ~ 1.000
ReorgOptimizations/DedupFilterPipeline/Old/10K-4 690.6µ 639.3µ ~ 0.100
ReorgOptimizations/DedupFilterPipeline/New/10K-4 667.6µ 659.1µ ~ 0.700
ReorgOptimizations/AllMarkFalse/Old/10K-4 89.83µ 95.16µ ~ 0.700
ReorgOptimizations/AllMarkFalse/New/10K-4 49.90µ 49.62µ ~ 0.200
ReorgOptimizations/HashSlicePool/Old/10K-4 45.66µ 54.81µ ~ 0.100
ReorgOptimizations/HashSlicePool/New/10K-4 8.660µ 8.610µ ~ 0.700
ReorgOptimizations/NodeFlags/Old/10K-4 3.772µ 4.499µ ~ 0.100
ReorgOptimizations/NodeFlags/New/10K-4 1.268µ 2.021µ ~ 0.100
ReorgOptimizations/DedupFilterPipeline/Old/100K-4 7.300m 7.770m ~ 0.200
ReorgOptimizations/DedupFilterPipeline/New/100K-4 7.846m 8.295m ~ 0.400
ReorgOptimizations/AllMarkFalse/Old/100K-4 870.5µ 928.8µ ~ 0.100
ReorgOptimizations/AllMarkFalse/New/100K-4 545.2µ 546.3µ ~ 0.200
ReorgOptimizations/HashSlicePool/Old/100K-4 440.9µ 468.4µ ~ 0.700
ReorgOptimizations/HashSlicePool/New/100K-4 199.7µ 199.5µ ~ 0.700
ReorgOptimizations/NodeFlags/Old/100K-4 40.51µ 40.57µ ~ 1.000
ReorgOptimizations/NodeFlags/New/100K-4 14.35µ 14.08µ ~ 0.400
TxMapSetIfNotExists-4 36.18n 35.61n ~ 0.200
TxMapSetIfNotExistsDuplicate-4 29.96n 29.91n ~ 0.600
ChannelSendReceive-4 468.5n 465.9n ~ 1.000
CalcBlockWork-4 494.0n 471.2n ~ 1.000
CalculateWork-4 623.1n 627.3n ~ 0.100
BuildBlockLocatorString_Helpers/Size_10-4 1.277µ 1.313µ ~ 0.100
BuildBlockLocatorString_Helpers/Size_100-4 12.31µ 12.47µ ~ 0.100
BuildBlockLocatorString_Helpers/Size_1000-4 131.2µ 145.8µ ~ 0.700
CatchupWithHeaderCache-4 104.1m 104.4m ~ 0.700
_prepareTxsPerLevel-4 412.5m 408.4m ~ 0.100
_prepareTxsPerLevelOrdered-4 3.840m 3.659m ~ 0.700
_prepareTxsPerLevel_Comparison/Original-4 431.9m 420.5m ~ 0.700
_prepareTxsPerLevel_Comparison/Optimized-4 3.793m 3.490m ~ 0.100
SubtreeSizes/10k_tx_4_per_subtree-4 1.380m 1.316m ~ 0.700
SubtreeSizes/10k_tx_16_per_subtree-4 328.2µ 318.0µ ~ 0.100
SubtreeSizes/10k_tx_64_per_subtree-4 77.37µ 74.76µ ~ 0.100
SubtreeSizes/10k_tx_256_per_subtree-4 19.11µ 18.34µ ~ 0.100
SubtreeSizes/10k_tx_512_per_subtree-4 9.492µ 9.114µ ~ 0.100
SubtreeSizes/10k_tx_1024_per_subtree-4 4.655µ 4.585µ ~ 0.100
SubtreeSizes/10k_tx_2k_per_subtree-4 2.356µ 2.274µ ~ 0.100
BlockSizeScaling/10k_tx_64_per_subtree-4 76.08µ 72.52µ ~ 0.100
BlockSizeScaling/10k_tx_256_per_subtree-4 19.09µ 18.61µ ~ 0.100
BlockSizeScaling/10k_tx_1024_per_subtree-4 4.709µ 4.556µ ~ 0.100
BlockSizeScaling/50k_tx_64_per_subtree-4 395.5µ 389.2µ ~ 0.100
BlockSizeScaling/50k_tx_256_per_subtree-4 95.08µ 93.27µ ~ 0.100
BlockSizeScaling/50k_tx_1024_per_subtree-4 23.58µ 22.89µ ~ 0.100
SubtreeAllocations/small_subtrees_exists_check-4 156.4µ 159.5µ ~ 0.700
SubtreeAllocations/small_subtrees_data_fetch-4 171.7µ 165.4µ ~ 0.100
SubtreeAllocations/small_subtrees_full_validation-4 325.0µ 322.3µ ~ 0.100
SubtreeAllocations/medium_subtrees_exists_check-4 9.367µ 9.319µ ~ 0.700
SubtreeAllocations/medium_subtrees_data_fetch-4 10.177µ 9.742µ ~ 0.100
SubtreeAllocations/medium_subtrees_full_validation-4 19.36µ 18.69µ ~ 0.100
SubtreeAllocations/large_subtrees_exists_check-4 2.249µ 2.224µ ~ 0.100
SubtreeAllocations/large_subtrees_data_fetch-4 2.471µ 2.364µ ~ 0.100
SubtreeAllocations/large_subtrees_full_validation-4 4.827µ 4.703µ ~ 0.100
_BufferPoolAllocation/16KB-4 3.319µ 3.472µ ~ 0.100
_BufferPoolAllocation/32KB-4 7.090µ 7.924µ ~ 0.100
_BufferPoolAllocation/64KB-4 18.05µ 17.98µ ~ 1.000
_BufferPoolAllocation/128KB-4 30.13µ 32.91µ ~ 0.100
_BufferPoolAllocation/512KB-4 122.3µ 116.5µ ~ 0.100
_BufferPoolConcurrent/32KB-4 18.83µ 18.76µ ~ 0.700
_BufferPoolConcurrent/64KB-4 29.45µ 29.16µ ~ 0.200
_BufferPoolConcurrent/512KB-4 159.5µ 165.6µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/16KB-4 675.4µ 675.1µ ~ 0.700
_SubtreeDeserializationWithBufferSizes/32KB-4 667.9µ 668.3µ ~ 1.000
_SubtreeDeserializationWithBufferSizes/64KB-4 667.0µ 659.4µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/128KB-4 673.4µ 662.4µ ~ 0.400
_SubtreeDeserializationWithBufferSizes/512KB-4 681.8µ 669.3µ ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/16KB-4 38.08m 37.83m ~ 0.700
_SubtreeDataDeserializationWithBufferSizes/32KB-4 38.03m 38.00m ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/64KB-4 38.10m 37.67m ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/128KB-4 37.84m 37.87m ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/512KB-4 38.18m 38.01m ~ 0.400
_PooledVsNonPooled/Pooled-4 823.1n 826.2n ~ 0.200
_PooledVsNonPooled/NonPooled-4 7.080µ 7.820µ ~ 0.100
_MemoryFootprint/Current_512KB_32concurrent-4 8.141µ 8.842µ ~ 0.100
_MemoryFootprint/Proposed_32KB_32concurrent-4 11.80µ 12.65µ ~ 0.100
_MemoryFootprint/Alternative_64KB_32concurrent-4 11.14µ 11.26µ ~ 1.000
StoreBlock_Sequential/BelowCSVHeight-4 325.3µ 323.1µ ~ 0.400
StoreBlock_Sequential/AboveCSVHeight-4 326.1µ 312.0µ ~ 0.200
GetUtxoHashes-4 256.8n 257.5n ~ 0.800
GetUtxoHashes_ManyOutputs-4 44.18µ 44.30µ ~ 0.700
_NewMetaDataFromBytes-4 239.7n 242.0n ~ 0.100
_Bytes-4 629.7n 629.0n ~ 0.700
_MetaBytes-4 571.1n 567.6n ~ 0.700

Threshold: >10% with p < 0.05 | Generated: 2026-05-12 14:09 UTC

icellan added a commit that referenced this pull request May 8, 2026
Restores the partition-level consume parallelism that the franz-go
switch (#611, 2026-03-27) silently removed, and fixes the producer-side
sticky-partitioner skew that produces bursty per-partition load.

Three changes, all on the txmeta hot path:

1) services/validator/Validator.go — Kafka producer now sets Key to a
   tx hash (the first hash in the batch for batched sends, the txn's
   own hash for single sends). Previously Key was nil, which under
   franz-go's default StickyKeyPartitioner is equivalent to a
   StickyPartitioner: every batch lands on the same partition until
   the linger/batch threshold trips, producing the bursty oscillation
   we observed in production.

   With a non-nil key, StickyKeyPartitioner hashes the key onto a
   partition deterministically. tx hashes are uniformly distributed,
   so partition usage is now uniform.

2) util/kafka/kafka_consumer.go (Start) — replaces the single-goroutine
   fetches.EachRecord(...) loop with fetches.EachPartition(...) +
   one goroutine per partition per fetch. Within a partition records
   are still processed sequentially so per-partition order is
   preserved; across partitions the work runs in parallel. partitionWg
   is awaited before the next PollFetches so in-flight goroutine count
   is bounded by partition count.

   This restores N-way concurrency that the previous Kafka library
   provided implicitly via per-partition consumer goroutines.

3) util/kafka/kafka_consumer.go (NewKafkaConsumerGroup) — when
   AutoCommitEnabled is true, registers kgo.AutoCommitMarks() and the
   per-partition loop calls client.MarkCommitRecords(record) only
   after consumerFn returns nil. With the previous default auto-commit
   any record that consumerFn errored on was still being committed by
   the auto-commit timer because franz-go tracks "iterated" not
   "succeeded". This change makes auto-commit consistent with the
   manual-commit path's semantics.

Verified:
- go build / go vet across changed packages — clean
- go test -short -race ./util/kafka/... — pass (perf tests are
  -short-skipped; they require a real broker)
- go test -race ./services/subtreevalidation/{TestServer_txmetaHandler,
  TestParseTxmetaBatch} — pass (worker pool from #834 still works
  under the new parallel feeder)
- Pre-commit hooks (gofmt / gci / golangci-lint) — pass

Notes:
- Producer compression intentionally NOT enabled — Bitcoin tx data
  is high-entropy, compression would just burn CPU.
- AutoCommitMarks closes the gap where errored records were still
  committed; it does NOT close the gap where async handlers (txmeta
  worker pool from #834) return before the cache write completes.
  Per discussion this is acceptable: cache misses fall through to
  the UTXO store transparently.
Comment thread services/subtreevalidation/txmetaHandler.go Outdated
Comment thread services/subtreevalidation/txmetaHandler.go Outdated
icellan added 2 commits May 8, 2026 15:18
CI on PR #828 caught three e2e tests panicking with "send on closed
channel" originating from txmetaHandler.go:148:

    legacy-sync       TestBIP68_TimeBased_Accept
    smoketest         TestBIP68_HeightBased_Accept
    prunertest        TestPrunerUnminedParentRetention

All three use the in-memory Kafka consumer which delivers a final
message AFTER Server.Stop() has returned from txmetaConsumerClient.
Close(). The handler running on that delivery race-loses with
stopTxmetaCacheWorkers's close(txmetaCacheJobCh) and panics.

Fix: a sync.RWMutex (txmetaCacheCloseMu) plus a closed flag
(txmetaCacheClosed) coordinate the two paths:

- Senders (txmetaHandler) take the read lock, check closed, send.
- Closer (stopTxmetaCacheWorkers) takes the write lock, flips the
  flag, closes the channel — all under the write lock.

Read-lock-then-check-then-send is atomic from the closer's POV: a
sender either sees closed=false and completes its send before the
write lock is granted, or sees closed=true and bails. No path can
land on close(ch) followed by send(ch).

Read locks are uncontended on the steady-state hot path and the
close runs once per Server lifetime, so overhead is negligible.

Test: TestServer_txmetaHandler_ShutdownRace stresses 16 sender
goroutines against a concurrent stop(); 10× -race iterations clean.

Verified:
- go build / go vet — clean
- go test -count=10 -race -run "TestServer_txmetaHandler|
  TestParseTxmetaBatch" ./services/subtreevalidation/ — pass
- Pre-commit hooks — pass
…er pool

The bounded worker pool added in #834 unblocked the consumer but capped
throughput at the worker count and bound the apply rate to the slowest
worker on every batch. Production never recovered the historical 2M+
ops/sec on this path.

Two changes:

  1. Drop the worker pool. txmetaHandler now spawns one goroutine per
     parsed Kafka message (fire-and-forget). Cache fill races subtree
     arrival; every ms of queueing matters because a stale cache forces
     fall-through to the UTXO store, which is far more expensive.

  2. Apply per entry, not per batch. SetCacheMulti's bucket fan-out
     takes each touched bucket's write lock for the duration of writing
     ALL keys mapped to that bucket (~1 ms hold for 1024-key batches).
     Under many concurrent writers, contenders queue up behind that hold
     and end-of-queue wait inflates with concurrency. Per-entry
     SetCacheFromBytes acquires the bucket lock once per key (~1 µs
     holds). Aggregate work is the same; the lock-contention queue is
     much shallower, which is what historically sustained 2M+ ops/sec.

Tests updated:
  - successful set test asserts SetCacheFromBytes (not SetCacheMulti)
  - new TestServer_txmetaHandler_PerEntrySetCacheFromBytes guards the
    per-entry strategy and explicitly AssertNotCalled on SetCacheMulti
  - TestServer_txmetaHandler_ShutdownRace renamed to ConcurrentCalls;
    the close-vs-send race it covered no longer exists (no channel)
icellan added a commit that referenced this pull request May 8, 2026
…er pool

The bounded worker pool added in #834 unblocked the consumer but capped
throughput at the worker count and bound the apply rate to the slowest
worker on every batch. Production never recovered the historical 2M+
ops/sec on this path.

Two changes:

  1. Drop the worker pool. txmetaHandler now spawns one goroutine per
     parsed Kafka message (fire-and-forget). Cache fill races subtree
     arrival; every ms of queueing matters because a stale cache forces
     fall-through to the UTXO store, which is far more expensive.

  2. Apply per entry, not per batch. SetCacheMulti's bucket fan-out
     takes each touched bucket's write lock for the duration of writing
     ALL keys mapped to that bucket (~1 ms hold for 1024-key batches).
     Under many concurrent writers, contenders queue up behind that hold
     and end-of-queue wait inflates with concurrency. Per-entry
     SetCacheFromBytes acquires the bucket lock once per key (~1 µs
     holds). Aggregate work is the same; the lock-contention queue is
     much shallower, which is what historically sustained 2M+ ops/sec.

Tests updated:
  - successful set test asserts SetCacheFromBytes (not SetCacheMulti)
  - new TestServer_txmetaHandler_PerEntrySetCacheFromBytes guards the
    per-entry strategy and explicitly AssertNotCalled on SetCacheMulti
  - TestServer_txmetaHandler_ShutdownRace renamed to ConcurrentCalls;
    the close-vs-send race it covered no longer exists (no channel)
icellan added 3 commits May 8, 2026 21:15
Two-tier execution in the txmeta Kafka consumer path:

  ADDs   - each entry in a parsed Kafka batch spawns its own goroutine
           that does ONE SetCacheFromBytes and exits. No per-batch
           sequential walk, no waitgroup, no bound. Best-effort: a
           missed write falls through to the UTXO store on the next
           BatchDecorate, so we optimise for latency-to-cache, not for
           delivery guarantees. Spawning per-entry maximises bucket
           shard parallelism - each writer holds the bucket lock ~1 us,
           and across 8192 buckets the contention queue stays shallow
           even at multi-million ops/sec.

  DELETE - runs synchronously in the Kafka consumer goroutine. Delete
           must complete before the offset commits, otherwise stale
           metadata can survive a tx being conflicted/replaced and
           future BatchDecorate calls will return wrong results. On
           failure we surface the error so the consumer leaves the
           record uncommitted; on rebalance/restart the message is
           re-delivered and the delete retried.

Tests updated to assert the synchronous-DELETE error propagation.
…iate goroutine

txmetaHandler runs inside the Kafka consumer's per-partition goroutine
already, so the previous design — parse into a job struct → spawn an
applyTxmetaCacheJob goroutine → that goroutine spawns N entry goroutines —
added one goroutine layer with no actual work, plus a job struct alloc
and 2N per-entry mallocs (key + value copies).

This commit flattens the call chain:

  - parseTxmetaBatch and applyTxmetaCacheJob removed.
  - txmetaHandler parses entries inline.
  - DELETEs run synchronously, return error on failure (offset stays
    uncommitted, message re-delivered on rebalance/restart).
  - ADDs each spawn one fire-and-forget goroutine that does a single
    SetCacheFromBytes and exits.

Memory: keys and values are SUBSLICES of msg.Value, not heap copies.
franz-go's Record.Value is a stable per-record buffer (not pooled or
reused), so the apply goroutines hold subslices and Go's GC keeps the
underlying buffer alive until they finish. The only per-entry copy is
the 32-byte chainhash.Hash for DELETEs (forced by the [32]byte array
type, value copy, no allocation).

Allocations per Kafka message: 0 per entry + N goroutine stacks for the
ADD apply work. Down from 1 (job struct) + 2 (slice headers) + 2N
(per-entry key/value) for a 1024-entry batch.

Tests: TestParseTxmetaBatch removed (parser no longer exists).
Replaced with TestServer_txmetaHandler_MixedBatch (ADD+DELETE+ADD →
2x SetCacheFromBytes, 1x Delete) and
TestServer_txmetaHandler_TruncatedBatch (malformed ack, no panic).
…y goroutine

Per-entry ADD goroutines saturated the Go scheduler at scale. Production
metrics on dev-scale-1-scale-2 (256-partition txmeta topic, ~1.1M
entries/sec): scheduler queue ~89K deep, p50 enqueue->SetCacheFromBytes
latency ~70ms, p99 ~256ms — versus the actual cache write taking
microseconds. Spawn cost (~2us) + queue wait (80ms) dominated; the
per-entry parallelism we were paying for was queue depth, not real
concurrency.

Walk the 1024 entries serially inside the partition goroutine instead.
Bucket-shard parallelism is preserved across the 256 partition
goroutines (each handling its own records); one goroutine per partition,
one cache-write at a time within. Per-record runtime: ~5ms (1024 x
~5us). Goroutines spawned per second drops from ~1.1M to ~1075.

Aggregate throughput is the same. The visible chart oscillation should
flatten because completions are now evenly spaced, and the latency
histogram should drop to single-digit ms.
@freemans13 freemans13 removed their request for review May 12, 2026 10:09
@gokutheengineer gokutheengineer force-pushed the fix/subtree-validator-kafka-cache-throughput branch from 7af5bc0 to 60432ce Compare May 12, 2026 13:39
@sonarqubecloud

Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
63.6% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube Cloud

@oskarszoon oskarszoon left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approve. Good prod-driven iteration — the rewrite from worker-pool+channels to inline synchronous-DELETE + fire-and-forget-ADD is cleaner and addresses both of gokutheengineer's findings (ADD/DELETE reordering, shutdown race). Per-partition parallelism still comes from the 256 partition goroutines themselves, with two orders of magnitude fewer total goroutines than the pre-PR design. Good doc comments on the why-each-choice.

One ask: SetCacheMulti was added to the cache interface in the first design pass, but the final rewrite uses per-entry SetCacheFromBytes synchronously — is SetCacheMulti still needed for a planned use case, or vestigial after the redesign? If vestigial, worth removing to keep the surface clean; if planned, a one-line comment at the interface declaration would prevent it from being culled by a future reader.

@icellan icellan merged commit c11a28c into feat/teranode-native-ops May 15, 2026
50 checks passed
@icellan icellan deleted the fix/subtree-validator-kafka-cache-throughput branch May 15, 2026 15:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants