Skip to content

fix(subtreevalidation): replace unbounded goroutine per Kafka message with bounded shard worker pool#858

Merged
gokutheengineer merged 3 commits into
bsv-blockchain:mainfrom
gokutheengineer:gokhan/fix-subtreevaliation-kafka
May 14, 2026
Merged

fix(subtreevalidation): replace unbounded goroutine per Kafka message with bounded shard worker pool#858
gokutheengineer merged 3 commits into
bsv-blockchain:mainfrom
gokutheengineer:gokhan/fix-subtreevaliation-kafka

Conversation

@gokutheengineer

@gokutheengineer gokutheengineer commented May 13, 2026

Copy link
Copy Markdown
Collaborator

Summary

The txmeta Kafka handler was spawning an unbounded goroutine per message, which could exhaust memory under sustained load. This PR replaces that pattern with a 256-shard bounded worker pool and fixes the invalid-subtree producer lifecycle.

Changes

txmeta handler (txmetaHandler.go)

  • Removed the go func() goroutine-per-message pattern
  • Introduced a 256-shard worker pool (txmetaWorkerShardCount) keyed by hash[0], preserving per-key ordering while allowing parallelism across different keys
  • Each shard has a fixed-size queue (txmetaWorkerQueueSize = 256); a full queue returns an error so the Kafka consumer can apply backpressure instead of silently dropping work
  • Workers are lazily initialised once via sync.Once and cleanly shut down in Stop()

Invalid subtree producer (Server.go)

  • The producer channel is now server-owned (invalidSubtreeProducerChannel) so Start and Publish always share the same channel lifecycle
  • Stop() now explicitly calls invalidSubtreeKafkaProducer.Stop() to drain inflight messages on shutdown

Tests

  • TestServer_txmetaHandler_PreservesPerKeyOrdering — verifies ADD then DELETE on the same key are processed in order
  • TestServer_txmetaHandler_ReturnsErrorWhenQueueFull — verifies backpressure error is returned when shard queue is full
  • TestPublishInvalidSubtree_EndToEndMemoryKafka — end-to-end round-trip through the in-memory Kafka broker verifying message delivery

@github-actions

github-actions Bot commented May 13, 2026

Copy link
Copy Markdown
Contributor

🤖 Claude Code Review

Status: Complete


Current Review:

All previously reported issues have been resolved:

  • Test comment documentation updated to reflect worker pool behavior
  • Worker shutdown behavior documented (lines 146-148 in txmetaHandler.go)
  • Unused invalidSubtreeProducerChannel field removed

No new issues found. The implementation looks solid:

  • Bounded worker pool correctly prevents unbounded goroutine growth
  • Per-key ordering preserved via shard selection (hash[0] % 256)
  • Proper backpressure mechanism when queues are full
  • Clean shutdown with proper synchronization
  • Comprehensive test coverage including ordering tests and backpressure verification

History:

  • ✅ Fixed: Test comment updated to reflect new worker pool behavior (was: handler processes messages asynchronously)
  • ✅ Fixed: Worker shutdown behavior documented with rationale
  • ✅ Fixed: Removed unused invalidSubtreeProducerChannel field

@gokutheengineer gokutheengineer changed the title fix: validate subtree fix(subtreevalidation): replace unbounded goroutine per Kafka message with bounded shard worker pool May 13, 2026
Comment thread services/subtreevalidation/txmetaHandler_test.go Outdated
Comment thread services/subtreevalidation/txmetaHandler.go
Comment thread services/subtreevalidation/Server.go Outdated
@github-actions

github-actions Bot commented May 13, 2026

Copy link
Copy Markdown
Contributor

Benchmark Comparison Report

Baseline: main (unknown)

Current: PR-858 (cbd7f07)

Summary

  • Regressions: 0
  • Improvements: 0
  • Unchanged: 142
  • Significance level: p < 0.05
All benchmark results (sec/op)
Benchmark Baseline Current Change p-value
_NewBlockFromBytes-4 1.698µ 1.688µ ~ 0.100
SplitSyncedParentMap_SetIfNotExists/256_buckets-4 61.67n 61.58n ~ 0.700
SplitSyncedParentMap_SetIfNotExists/16_buckets-4 61.68n 61.56n ~ 0.400
SplitSyncedParentMap_SetIfNotExists/1_bucket-4 61.79n 61.81n ~ 0.600
SplitSyncedParentMap_ConcurrentSetIfNotExists/256_buckets... 31.30n 31.30n ~ 0.800
SplitSyncedParentMap_ConcurrentSetIfNotExists/16_buckets_... 54.76n 54.59n ~ 0.400
SplitSyncedParentMap_ConcurrentSetIfNotExists/1_bucket_pa... 113.2n 123.9n ~ 0.100
MiningCandidate_Stringify_Short-4 264.5n 264.6n ~ 1.000
MiningCandidate_Stringify_Long-4 1.914µ 1.925µ ~ 0.200
MiningSolution_Stringify-4 1000.0n 994.5n ~ 0.700
BlockInfo_MarshalJSON-4 1.799µ 1.803µ ~ 0.300
NewFromBytes-4 128.1n 129.4n ~ 0.400
Mine_EasyDifficulty-4 60.13µ 61.03µ ~ 0.200
Mine_WithAddress-4 7.501µ 6.710µ ~ 0.400
BlockAssembler_AddTx-4 0.02795n 0.02819n ~ 1.000
AddNode-4 10.67 11.13 ~ 0.200
AddNodeWithMap-4 11.10 11.37 ~ 0.700
DirectSubtreeAdd/4_per_subtree-4 61.40n 59.13n ~ 0.400
DirectSubtreeAdd/64_per_subtree-4 31.66n 32.06n ~ 1.000
DirectSubtreeAdd/256_per_subtree-4 30.36n 30.35n ~ 1.000
DirectSubtreeAdd/1024_per_subtree-4 29.14n 29.13n ~ 1.000
DirectSubtreeAdd/2048_per_subtree-4 28.71n 28.65n ~ 1.000
SubtreeProcessorAdd/4_per_subtree-4 278.4n 280.5n ~ 1.000
SubtreeProcessorAdd/64_per_subtree-4 271.8n 271.9n ~ 1.000
SubtreeProcessorAdd/256_per_subtree-4 273.5n 274.7n ~ 0.100
SubtreeProcessorAdd/1024_per_subtree-4 266.6n 267.3n ~ 1.000
SubtreeProcessorAdd/2048_per_subtree-4 266.1n 266.7n ~ 0.200
SubtreeProcessorRotate/4_per_subtree-4 273.4n 273.4n ~ 0.800
SubtreeProcessorRotate/64_per_subtree-4 270.6n 269.7n ~ 0.400
SubtreeProcessorRotate/256_per_subtree-4 272.9n 272.0n ~ 0.400
SubtreeProcessorRotate/1024_per_subtree-4 272.9n 269.9n ~ 0.100
SubtreeNodeAddOnly/4_per_subtree-4 54.24n 54.17n ~ 1.000
SubtreeNodeAddOnly/64_per_subtree-4 34.35n 34.42n ~ 0.100
SubtreeNodeAddOnly/256_per_subtree-4 33.36n 33.46n ~ 0.600
SubtreeNodeAddOnly/1024_per_subtree-4 32.60n 32.81n ~ 0.100
SubtreeCreationOnly/4_per_subtree-4 112.8n 111.7n ~ 0.700
SubtreeCreationOnly/64_per_subtree-4 419.8n 397.0n ~ 0.200
SubtreeCreationOnly/256_per_subtree-4 1.359µ 1.323µ ~ 0.100
SubtreeCreationOnly/1024_per_subtree-4 4.402µ 4.427µ ~ 0.200
SubtreeCreationOnly/2048_per_subtree-4 8.061µ 8.044µ ~ 0.500
SubtreeProcessorOverheadBreakdown/64_per_subtree-4 270.0n 270.3n ~ 0.700
SubtreeProcessorOverheadBreakdown/1024_per_subtree-4 268.5n 270.6n ~ 0.100
ParallelGetAndSetIfNotExists/1k_nodes-4 583.4µ 597.4µ ~ 0.100
ParallelGetAndSetIfNotExists/10k_nodes-4 1.313m 1.378m ~ 0.100
ParallelGetAndSetIfNotExists/50k_nodes-4 6.559m 6.546m ~ 1.000
ParallelGetAndSetIfNotExists/100k_nodes-4 13.30m 13.19m ~ 0.700
SequentialGetAndSetIfNotExists/1k_nodes-4 656.0µ 654.2µ ~ 0.700
SequentialGetAndSetIfNotExists/10k_nodes-4 2.764m 2.738m ~ 0.200
SequentialGetAndSetIfNotExists/50k_nodes-4 10.52m 10.48m ~ 0.400
SequentialGetAndSetIfNotExists/100k_nodes-4 20.13m 19.75m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/1k_nodes-4 640.4µ 630.6µ ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/10k_nodes-4 4.230m 4.219m ~ 1.000
ProcessOwnBlockSubtreeNodesParallel/100k_nodes-4 16.67m 16.33m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/1k_nodes-4 698.2µ 696.6µ ~ 1.000
ProcessOwnBlockSubtreeNodesSequential/10k_nodes-4 5.927m 5.876m ~ 0.200
ProcessOwnBlockSubtreeNodesSequential/100k_nodes-4 37.77m 37.62m ~ 0.700
DiskTxMap_SetIfNotExists-4 4.041µ 4.058µ ~ 0.100
DiskTxMap_SetIfNotExists_Parallel-4 3.904µ 3.856µ ~ 0.400
DiskTxMap_ExistenceOnly-4 334.4n 320.4n ~ 0.100
Queue-4 208.1n 204.4n ~ 0.100
AtomicPointer-4 8.135n 8.126n ~ 0.200
ReorgOptimizations/DedupFilterPipeline/Old/10K-4 822.4µ 815.7µ ~ 0.700
ReorgOptimizations/DedupFilterPipeline/New/10K-4 774.5µ 815.3µ ~ 0.100
ReorgOptimizations/AllMarkFalse/Old/10K-4 119.8µ 114.5µ ~ 0.200
ReorgOptimizations/AllMarkFalse/New/10K-4 58.30µ 58.20µ ~ 1.000
ReorgOptimizations/HashSlicePool/Old/10K-4 65.81µ 76.04µ ~ 0.400
ReorgOptimizations/HashSlicePool/New/10K-4 11.78µ 11.81µ ~ 0.100
ReorgOptimizations/NodeFlags/Old/10K-4 6.421µ 6.040µ ~ 0.100
ReorgOptimizations/NodeFlags/New/10K-4 2.712µ 2.287µ ~ 0.100
ReorgOptimizations/DedupFilterPipeline/Old/100K-4 10.84m 11.11m ~ 1.000
ReorgOptimizations/DedupFilterPipeline/New/100K-4 10.62m 10.97m ~ 0.100
ReorgOptimizations/AllMarkFalse/Old/100K-4 1.187m 1.154m ~ 0.100
ReorgOptimizations/AllMarkFalse/New/100K-4 735.2µ 737.9µ ~ 0.700
ReorgOptimizations/HashSlicePool/Old/100K-4 592.0µ 600.9µ ~ 0.700
ReorgOptimizations/HashSlicePool/New/100K-4 330.0µ 334.0µ ~ 0.700
ReorgOptimizations/NodeFlags/Old/100K-4 55.43µ 57.73µ ~ 0.100
ReorgOptimizations/NodeFlags/New/100K-4 19.04µ 19.66µ ~ 0.100
TxMapSetIfNotExists-4 50.75n 49.82n ~ 0.700
TxMapSetIfNotExistsDuplicate-4 43.21n 43.42n ~ 0.300
ChannelSendReceive-4 673.3n 683.0n ~ 0.100
CalcBlockWork-4 498.5n 529.2n ~ 0.700
CalculateWork-4 676.8n 679.8n ~ 0.700
BuildBlockLocatorString_Helpers/Size_10-4 1.378µ 1.319µ ~ 0.400
BuildBlockLocatorString_Helpers/Size_100-4 12.52µ 14.10µ ~ 0.100
BuildBlockLocatorString_Helpers/Size_1000-4 123.6µ 125.0µ ~ 0.700
CatchupWithHeaderCache-4 104.5m 104.4m ~ 1.000
_prepareTxsPerLevel-4 424.7m 424.1m ~ 0.700
_prepareTxsPerLevelOrdered-4 3.803m 4.075m ~ 0.400
_prepareTxsPerLevel_Comparison/Original-4 426.5m 423.7m ~ 0.700
_prepareTxsPerLevel_Comparison/Optimized-4 3.791m 3.803m ~ 0.700
_BufferPoolAllocation/16KB-4 3.578µ 3.281µ ~ 0.100
_BufferPoolAllocation/32KB-4 9.218µ 7.342µ ~ 0.700
_BufferPoolAllocation/64KB-4 15.04µ 18.95µ ~ 0.700
_BufferPoolAllocation/128KB-4 30.65µ 31.54µ ~ 0.200
_BufferPoolAllocation/512KB-4 100.1µ 109.4µ ~ 0.200
_BufferPoolConcurrent/32KB-4 17.24µ 19.11µ ~ 0.100
_BufferPoolConcurrent/64KB-4 27.62µ 30.03µ ~ 0.100
_BufferPoolConcurrent/512KB-4 140.9µ 147.0µ ~ 1.000
_SubtreeDeserializationWithBufferSizes/16KB-4 646.3µ 612.6µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/32KB-4 630.7µ 616.9µ ~ 0.700
_SubtreeDeserializationWithBufferSizes/64KB-4 628.3µ 609.8µ ~ 1.000
_SubtreeDeserializationWithBufferSizes/128KB-4 652.3µ 621.2µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/512KB-4 641.9µ 633.8µ ~ 0.400
_SubtreeDataDeserializationWithBufferSizes/16KB-4 35.90m 35.61m ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/32KB-4 35.63m 35.34m ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/64KB-4 36.22m 35.26m ~ 0.200
_SubtreeDataDeserializationWithBufferSizes/128KB-4 35.18m 35.90m ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/512KB-4 35.58m 35.22m ~ 0.200
_PooledVsNonPooled/Pooled-4 830.4n 829.1n ~ 0.500
_PooledVsNonPooled/NonPooled-4 6.910µ 7.538µ ~ 0.700
_MemoryFootprint/Current_512KB_32concurrent-4 7.686µ 7.459µ ~ 0.100
_MemoryFootprint/Proposed_32KB_32concurrent-4 9.607µ 9.402µ ~ 0.700
_MemoryFootprint/Alternative_64KB_32concurrent-4 9.148µ 9.327µ ~ 0.200
SubtreeSizes/10k_tx_4_per_subtree-4 1.447m 1.404m ~ 0.700
SubtreeSizes/10k_tx_16_per_subtree-4 337.7µ 327.7µ ~ 0.100
SubtreeSizes/10k_tx_64_per_subtree-4 83.10µ 80.11µ ~ 0.100
SubtreeSizes/10k_tx_256_per_subtree-4 20.79µ 20.26µ ~ 0.100
SubtreeSizes/10k_tx_512_per_subtree-4 10.24µ 10.01µ ~ 0.100
SubtreeSizes/10k_tx_1024_per_subtree-4 5.042µ 4.922µ ~ 0.100
SubtreeSizes/10k_tx_2k_per_subtree-4 2.564µ 2.449µ ~ 0.100
BlockSizeScaling/10k_tx_64_per_subtree-4 80.68µ 77.83µ ~ 0.100
BlockSizeScaling/10k_tx_256_per_subtree-4 20.50µ 19.82µ ~ 0.100
BlockSizeScaling/10k_tx_1024_per_subtree-4 5.069µ 4.948µ ~ 0.100
BlockSizeScaling/50k_tx_64_per_subtree-4 401.5µ 396.8µ ~ 0.700
BlockSizeScaling/50k_tx_256_per_subtree-4 101.26µ 98.12µ ~ 0.100
BlockSizeScaling/50k_tx_1024_per_subtree-4 25.07µ 24.55µ ~ 0.100
SubtreeAllocations/small_subtrees_exists_check-4 164.1µ 156.3µ ~ 0.100
SubtreeAllocations/small_subtrees_data_fetch-4 176.1µ 166.9µ ~ 0.100
SubtreeAllocations/small_subtrees_full_validation-4 335.3µ 326.3µ ~ 0.400
SubtreeAllocations/medium_subtrees_exists_check-4 10.089µ 9.509µ ~ 0.100
SubtreeAllocations/medium_subtrees_data_fetch-4 10.90µ 10.22µ ~ 0.100
SubtreeAllocations/medium_subtrees_full_validation-4 20.84µ 20.03µ ~ 0.100
SubtreeAllocations/large_subtrees_exists_check-4 2.469µ 2.337µ ~ 0.100
SubtreeAllocations/large_subtrees_data_fetch-4 2.675µ 2.530µ ~ 0.100
SubtreeAllocations/large_subtrees_full_validation-4 5.230µ 5.153µ ~ 0.100
StoreBlock_Sequential/BelowCSVHeight-4 334.3µ 329.4µ ~ 0.100
StoreBlock_Sequential/AboveCSVHeight-4 320.0µ 330.3µ ~ 1.000
GetUtxoHashes-4 263.2n 255.9n ~ 0.400
GetUtxoHashes_ManyOutputs-4 44.27µ 43.54µ ~ 0.100
_NewMetaDataFromBytes-4 238.2n 236.4n ~ 0.100
_Bytes-4 624.0n 617.1n ~ 0.100
_MetaBytes-4 573.4n 566.3n ~ 0.700

Threshold: >10% with p < 0.05 | Generated: 2026-05-13 13:22 UTC

@gokutheengineer gokutheengineer force-pushed the gokhan/fix-subtreevaliation-kafka branch from 7448b30 to 4f376db Compare May 13, 2026 13:07
@sonarqubecloud

Copy link
Copy Markdown

@gokutheengineer gokutheengineer merged commit 6eff0ab into bsv-blockchain:main May 14, 2026
25 checks passed
icellan added a commit that referenced this pull request May 18, 2026
Conflicts resolved:
- go.mod / go.sum: kept BSV aerospike-client fork + msgpack + go-tx-map
  pseudo-version from PR; took main's newer gobdk, go-chaincfg, go-wire,
  pgregory.net/rapid; dropped the upstream aerospike replace directive in
  favour of the BSV fork.
- services/asset/repository/GetLegacyBlock.go: pass streamCtx to
  writeChunkToWriter (PR side wired the parameter).
- services/blockassembly/subtreeprocessor/SubtreeProcessor.go: merged
  txMapPool/double-buffer fields from PR with main's clock field.
- services/subtreevalidation/txmetaHandler.go + _test.go: kept main's
  bounded shard-worker pool (#858) rather than the PR's inline walker;
  the worker queue fields are referenced from Server.go which was
  auto-merged from main.
- settings/asset_settings.go + settings/settings.go: kept PR's new
  ConcurrencySubtreeDataCreate field/binding.
- stores/blob/file/file.go + _test.go: took main's allowOverwrite/
  fsyncMode plumbing (renameTempFile/writeFileAtomically signatures
  gained allowOverwrite; syncAndCloseTempFile gated by fsyncMode).
- stores/blockchain/sql/GenesisHash_test.go: took main's explicit-params
  insertGenesisTransaction signature.
- stores/utxo/aerospike/spend.go: merged fmt/strings imports (PR) with
  runtime/debug (main).
- stores/utxo/aerospike/send_store_batch_test.go +
  stores/utxo/aerospike/pruner/pruned_set_skip_test.go: retargeted from
  upstream aerospike client to the BSV fork for consistency with the
  rest of the PR's import switch.

Review-comment fixes applied on top of the merge:
- stores/utxo/aerospike/native_op.go: executeTeranodeOp now reuses the
  caller's WritePolicy on the native path instead of synthesising a
  fresh one, matching the UDF path. Also clarified the probe comment:
  the probe sends a *valid* setLocked sub-op via TeranodeModifyOp and
  treats PARAMETER_ERROR as 'unsupported'.
- stores/utxo/aerospike/aerospike.go: PreserveTransactions now builds
  batchRecords via append (with a parallel recordTxIDs slice) so a
  failed NewKey can never leave a nil entry that BatchOperate would
  nil-deref on.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants