Skip to content

fix(kafka): per-partition fan-out + sticky-partitioner producer keys#895

Merged
icellan merged 2 commits into
bsv-blockchain:mainfrom
icellan:feat/kafka-consumer-perpartition
May 19, 2026
Merged

fix(kafka): per-partition fan-out + sticky-partitioner producer keys#895
icellan merged 2 commits into
bsv-blockchain:mainfrom
icellan:feat/kafka-consumer-perpartition

Conversation

@icellan

@icellan icellan commented May 19, 2026

Copy link
Copy Markdown
Contributor

Split out of #828 so it can be reviewed independently.

Consumer (util/kafka/kafka_consumer.go)

  • Dispatches each fetch's per-partition records to a goroutine without a between-fetch barrier (no partitionWg.Wait between PollFetches). franz-go's local buffer keeps draining while one slow partition is in flight.
  • Goroutines drain into a shared uncommittedRecords slice under a mutex.
  • Shutdown waits on a single partitionWg.Wait so the final commitRecords captures every record that completed processing — fixes a window where a goroutine mid-consumerFn appended after the puller had committed and returned.
  • AutoCommitEnabled uses MarkCommitRecords (lock-internal in franz-go, safe from many goroutines concurrently).
  • Threads the partition's HighWaterMark (added upstream in PR fix(subtreevalidation): two-mode txmeta enqueue (block during backfill, drop+warn live) #891) into each KafkaMessage so handlers can detect "caught up to live tail."

Producer (services/validator/Validator.go)

  • sendTxMetaToKafka / sendTxMetaBatch set the message Key to the (first) tx hash. With franz-go's default StickyKeyPartitioner this hashes onto a single partition deterministically:
    • Distributes traffic evenly across partitions (tx hashes are uniform).
    • Keeps every record from one batch on the same partition (preserves intra-batch ordering).
  • Nil keys previously degraded StickyKeyPartitioner to a StickyPartitioner, bunching consecutive batches onto the same partition until linger expired. Symptom on the consumer side was bursty per-partition read throughput.

txmetaHandler (services/subtreevalidation/txmetaHandler.go)

  • Shards Kafka ADD/DELETE entries by the first byte of the tx hash across 256 worker goroutines with bounded per-shard queues. Per-key ordering preserved (same hash → same shard → arrival order).
  • Switches between two modes on a one-way latch (txmetaCaughtUp): blocking-on-full during startup (cold cache rebuild), drop-on-full once any assigned partition reaches its tail.
  • Truncated messages are logged and acked to avoid infinite redelivery on corrupt input.

Cache interface (services/subtreevalidation/SubtreeValidation.go)

  • Adds SetCacheMulti(keys, values [][]byte) error to txMetaCacheOps and a corresponding SetTxMetaCacheMulti server method. Not yet called from the handler — kept for a future batched fan-out optimisation.

Test plan

  • go build ./... clean
  • go vet ./util/kafka/... ./services/validator/... ./services/subtreevalidation/... clean
  • make lint-new clean
  • Soak test against a saturated txmeta topic: consumer p99 should not spike when one partition is slower than others; producer load should distribute across all partitions instead of bunching.

Consumer: pull loop dispatches each fetch's per-partition records to
a goroutine without a between-fetch barrier (no partitionWg.Wait
between PollFetches), so franz-go's local buffer keeps draining
while one slow partition is in flight. Goroutines drain into a
shared uncommittedRecords slice under a mutex. Shutdown waits on
a single partitionWg.Wait so the final commitRecords captures every
in-flight record. AutoCommitEnabled uses MarkCommitRecords (lock-
internal in franz-go, safe from many goroutines).

Producer: sendTxMetaToKafka / sendTxMetaBatch now set the message
Key to the (first) tx hash. With franz-go's default
StickyKeyPartitioner this hashes onto a single partition
deterministically, distributing traffic evenly across partitions
(tx hashes are uniform) and keeping every record from one batch on
the same partition. Nil keys previously degraded the
StickyKeyPartitioner to a StickyPartitioner which bunched batches
until linger expired.

txmetaHandler: shards Kafka ADD/DELETE entries by hash byte across
256 worker goroutines with bounded per-shard queues. Preserves
per-key ordering. Switches from blocking-on-full to drop-on-full
once any partition reaches its tail (txmetaCaughtUp latch is
one-way).

Adds SetCacheMulti to the txMetaCacheOps interface and a
SetTxMetaCacheMulti server method for future batch-cache fan-out.

Surfaced as a follow-up split out of PR bsv-blockchain#828; reviewable on its own.
@github-actions

github-actions Bot commented May 19, 2026

Copy link
Copy Markdown
Contributor

🤖 Claude Code Review

Status: Complete

All previously reported issues have been addressed in commit fe137a4.

Current Review:
No new issues found. The changes correctly implement:

  • Per-partition fan-out with ordering guarantees via per-partition mutexes (kafka_consumer.go:491-501)
  • Proper shutdown drain on all exit paths (kafka_consumer.go:503-513, 517-530)
  • Sticky partitioner producer keys for even load distribution (Validator.go:969, 999)
  • Sharded txmeta handler with bounded queues (txmetaHandler.go:55-76)

History:

  • ✅ Fixed: Per-partition race condition (now uses sync.Map for per-partition locks)
  • ✅ Fixed: Shutdown path data loss (now uses shutdownDrain closure on all paths)
  • ✅ Fixed: Documentation accuracy (SetTxMetaCacheMulti clearly marked as future-only)

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves Kafka txmeta throughput by parallelizing consumer partition handling, using deterministic producer keys for partition distribution, and preparing a batched txmeta cache API.

Changes:

  • Updates franz-go consumer setup and per-partition fetch processing.
  • Sets txmeta Kafka producer keys from transaction hashes.
  • Adds documentation for txmeta sharded handling and introduces SetTxMetaCacheMulti.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
util/kafka/kafka_consumer.go Adds AutoCommitMarks and per-partition goroutine fan-out in the Kafka consumer loop.
services/validator/Validator.go Uses tx hashes as Kafka message keys for txmeta publishing.
services/subtreevalidation/txmetaHandler.go Expands txmeta handler comments describing sharding, queueing, and error semantics.
services/subtreevalidation/SubtreeValidation.go Extends txmeta cache interface/server methods with a multi-set cache API.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread util/kafka/kafka_consumer.go Outdated
Comment thread util/kafka/kafka_consumer.go
Comment thread services/subtreevalidation/SubtreeValidation.go Outdated
@github-actions

github-actions Bot commented May 19, 2026

Copy link
Copy Markdown
Contributor

Benchmark Comparison Report

Baseline: main (unknown)

Current: PR-895 (a5411a0)

Summary

  • Regressions: 0
  • Improvements: 0
  • Unchanged: 144
  • Significance level: p < 0.05
All benchmark results (sec/op)
Benchmark Baseline Current Change p-value
_NewBlockFromBytes-4 1.197µ 1.211µ ~ 0.100
SplitSyncedParentMap_SetIfNotExists/256_buckets-4 54.96n 54.95n ~ 1.000
SplitSyncedParentMap_SetIfNotExists/16_buckets-4 54.91n 54.96n ~ 0.700
SplitSyncedParentMap_SetIfNotExists/1_bucket-4 54.96n 55.07n ~ 0.500
SplitSyncedParentMap_ConcurrentSetIfNotExists/256_buckets... 25.01n 24.94n ~ 1.000
SplitSyncedParentMap_ConcurrentSetIfNotExists/16_buckets_... 41.58n 41.69n ~ 1.000
SplitSyncedParentMap_ConcurrentSetIfNotExists/1_bucket_pa... 97.07n 95.80n ~ 0.400
MiningCandidate_Stringify_Short-4 176.7n 174.2n ~ 0.100
MiningCandidate_Stringify_Long-4 1.236µ 1.225µ ~ 0.100
MiningSolution_Stringify-4 640.7n 638.7n ~ 0.100
BlockInfo_MarshalJSON-4 1.313µ 1.309µ ~ 0.300
NewFromBytes-4 127.6n 128.0n ~ 0.400
AddTxBatchColumnar_Validation-4 2.491µ 2.660µ ~ 0.100
OffsetValidationLoop-4 638.2n 638.4n ~ 1.000
Mine_EasyDifficulty-4 65.15µ 66.30µ ~ 0.700
Mine_WithAddress-4 6.819µ 6.953µ ~ 0.100
BlockAssembler_AddTx-4 0.03224n 0.02447n ~ 0.100
AddNode-4 11.17 11.48 ~ 0.400
AddNodeWithMap-4 11.72 11.95 ~ 0.100
DiskTxMap_SetIfNotExists-4 3.348µ 3.958µ ~ 0.100
DiskTxMap_SetIfNotExists_Parallel-4 3.215µ 3.590µ ~ 0.400
DiskTxMap_ExistenceOnly-4 307.7n 319.2n ~ 1.000
Queue-4 149.1n 151.3n ~ 0.700
AtomicPointer-4 2.519n 2.516n ~ 1.000
ReorgOptimizations/DedupFilterPipeline/Old/10K-4 627.9µ 642.7µ ~ 0.200
ReorgOptimizations/DedupFilterPipeline/New/10K-4 585.9µ 611.7µ ~ 0.100
ReorgOptimizations/AllMarkFalse/Old/10K-4 80.55µ 82.70µ ~ 0.400
ReorgOptimizations/AllMarkFalse/New/10K-4 49.99µ 49.67µ ~ 0.100
ReorgOptimizations/HashSlicePool/Old/10K-4 37.79µ 39.61µ ~ 0.100
ReorgOptimizations/HashSlicePool/New/10K-4 8.749µ 8.437µ ~ 0.100
ReorgOptimizations/NodeFlags/Old/10K-4 3.413µ 3.353µ ~ 1.000
ReorgOptimizations/NodeFlags/New/10K-4 1.069µ 1.127µ ~ 0.100
ReorgOptimizations/DedupFilterPipeline/Old/100K-4 7.451m 7.847m ~ 0.400
ReorgOptimizations/DedupFilterPipeline/New/100K-4 7.949m 8.099m ~ 1.000
ReorgOptimizations/AllMarkFalse/Old/100K-4 837.4µ 846.3µ ~ 0.100
ReorgOptimizations/AllMarkFalse/New/100K-4 546.6µ 547.1µ ~ 0.700
ReorgOptimizations/HashSlicePool/Old/100K-4 391.1µ 383.9µ ~ 0.200
ReorgOptimizations/HashSlicePool/New/100K-4 200.6µ 201.4µ ~ 1.000
ReorgOptimizations/NodeFlags/Old/100K-4 35.65µ 38.69µ ~ 0.100
ReorgOptimizations/NodeFlags/New/100K-4 12.37µ 13.39µ ~ 0.100
TxMapSetIfNotExists-4 38.25n 38.13n ~ 0.300
TxMapSetIfNotExistsDuplicate-4 31.90n 32.00n ~ 0.500
ChannelSendReceive-4 461.2n 454.3n ~ 0.400
DirectSubtreeAdd/4_per_subtree-4 79.79n 80.95n ~ 1.000
DirectSubtreeAdd/64_per_subtree-4 43.33n 43.50n ~ 1.000
DirectSubtreeAdd/256_per_subtree-4 42.06n 41.25n ~ 0.700
DirectSubtreeAdd/1024_per_subtree-4 40.42n 39.74n ~ 0.100
DirectSubtreeAdd/2048_per_subtree-4 39.89n 39.08n ~ 0.100
SubtreeProcessorAdd/4_per_subtree-4 503.1n 443.9n ~ 0.100
SubtreeProcessorAdd/64_per_subtree-4 510.2n 429.3n ~ 0.100
SubtreeProcessorAdd/256_per_subtree-4 486.2n 443.4n ~ 0.100
SubtreeProcessorAdd/1024_per_subtree-4 492.7n 446.4n ~ 0.100
SubtreeProcessorAdd/2048_per_subtree-4 507.5n 436.6n ~ 0.100
SubtreeProcessorRotate/4_per_subtree-4 507.1n 436.9n ~ 0.100
SubtreeProcessorRotate/64_per_subtree-4 476.5n 460.4n ~ 0.100
SubtreeProcessorRotate/256_per_subtree-4 447.6n 463.2n ~ 0.100
SubtreeProcessorRotate/1024_per_subtree-4 441.2n 480.8n ~ 0.100
SubtreeNodeAddOnly/4_per_subtree-4 91.22n 90.45n ~ 0.200
SubtreeNodeAddOnly/64_per_subtree-4 66.34n 66.04n ~ 0.200
SubtreeNodeAddOnly/256_per_subtree-4 65.05n 65.13n ~ 0.800
SubtreeNodeAddOnly/1024_per_subtree-4 64.52n 64.65n ~ 0.800
SubtreeCreationOnly/4_per_subtree-4 153.0n 154.4n ~ 1.000
SubtreeCreationOnly/64_per_subtree-4 562.6n 569.9n ~ 0.200
SubtreeCreationOnly/256_per_subtree-4 2.122µ 2.075µ ~ 0.200
SubtreeCreationOnly/1024_per_subtree-4 6.532µ 6.440µ ~ 0.700
SubtreeCreationOnly/2048_per_subtree-4 12.40µ 12.37µ ~ 0.700
SubtreeProcessorOverheadBreakdown/64_per_subtree-4 483.4n 446.8n ~ 0.100
SubtreeProcessorOverheadBreakdown/1024_per_subtree-4 460.5n 466.2n ~ 1.000
ParallelGetAndSetIfNotExists/1k_nodes-4 2.586m 2.545m ~ 0.400
ParallelGetAndSetIfNotExists/10k_nodes-4 7.311m 7.315m ~ 1.000
ParallelGetAndSetIfNotExists/50k_nodes-4 9.673m 10.105m ~ 0.200
ParallelGetAndSetIfNotExists/100k_nodes-4 13.68m 13.76m ~ 0.700
SequentialGetAndSetIfNotExists/1k_nodes-4 2.128m 2.146m ~ 0.700
SequentialGetAndSetIfNotExists/10k_nodes-4 8.055m 8.296m ~ 0.100
SequentialGetAndSetIfNotExists/50k_nodes-4 26.40m 30.24m ~ 0.100
SequentialGetAndSetIfNotExists/100k_nodes-4 53.43m 62.38m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/1k_nodes-4 2.596m 2.703m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/10k_nodes-4 10.24m 10.59m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/100k_nodes-4 16.88m 17.65m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/1k_nodes-4 2.193m 2.374m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/10k_nodes-4 11.48m 13.02m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/100k_nodes-4 83.36m 100.54m ~ 0.100
CalcBlockWork-4 501.0n 512.2n ~ 0.100
CalculateWork-4 697.6n 717.4n ~ 0.600
BuildBlockLocatorString_Helpers/Size_10-4 1.352µ 1.620µ ~ 0.600
BuildBlockLocatorString_Helpers/Size_100-4 12.78µ 13.07µ ~ 0.100
BuildBlockLocatorString_Helpers/Size_1000-4 145.2µ 126.7µ ~ 0.100
CatchupWithHeaderCache-4 104.6m 104.4m ~ 0.700
_BufferPoolAllocation/16KB-4 3.860µ 3.793µ ~ 0.100
_BufferPoolAllocation/32KB-4 7.703µ 7.369µ ~ 0.100
_BufferPoolAllocation/64KB-4 16.28µ 15.17µ ~ 0.700
_BufferPoolAllocation/128KB-4 29.95µ 27.99µ ~ 0.700
_BufferPoolAllocation/512KB-4 115.2µ 116.6µ ~ 0.400
_BufferPoolConcurrent/32KB-4 19.53µ 19.35µ ~ 0.700
_BufferPoolConcurrent/64KB-4 31.21µ 30.30µ ~ 0.700
_BufferPoolConcurrent/512KB-4 146.3µ 145.9µ ~ 1.000
_SubtreeDeserializationWithBufferSizes/16KB-4 649.0µ 658.4µ ~ 0.700
_SubtreeDeserializationWithBufferSizes/32KB-4 656.7µ 659.9µ ~ 1.000
_SubtreeDeserializationWithBufferSizes/64KB-4 727.6µ 654.8µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/128KB-4 703.6µ 632.7µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/512KB-4 666.2µ 652.9µ ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/16KB-4 36.73m 36.58m ~ 0.400
_SubtreeDataDeserializationWithBufferSizes/32KB-4 36.88m 36.40m ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/64KB-4 36.83m 36.67m ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/128KB-4 36.58m 36.67m ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/512KB-4 36.81m 36.23m ~ 0.100
_PooledVsNonPooled/Pooled-4 741.7n 744.0n ~ 0.700
_PooledVsNonPooled/NonPooled-4 8.299µ 7.571µ ~ 0.700
_MemoryFootprint/Current_512KB_32concurrent-4 7.246µ 6.978µ ~ 0.200
_MemoryFootprint/Proposed_32KB_32concurrent-4 9.952µ 9.994µ ~ 0.400
_MemoryFootprint/Alternative_64KB_32concurrent-4 9.511µ 9.643µ ~ 0.400
_prepareTxsPerLevel-4 414.3m 427.2m ~ 0.100
_prepareTxsPerLevelOrdered-4 4.100m 4.220m ~ 0.400
_prepareTxsPerLevel_Comparison/Original-4 414.1m 415.5m ~ 0.700
_prepareTxsPerLevel_Comparison/Optimized-4 3.747m 3.630m ~ 0.400
SubtreeSizes/10k_tx_4_per_subtree-4 1.378m 1.398m ~ 0.700
SubtreeSizes/10k_tx_16_per_subtree-4 327.2µ 333.1µ ~ 0.400
SubtreeSizes/10k_tx_64_per_subtree-4 78.55µ 78.48µ ~ 1.000
SubtreeSizes/10k_tx_256_per_subtree-4 19.82µ 19.51µ ~ 0.700
SubtreeSizes/10k_tx_512_per_subtree-4 9.758µ 9.723µ ~ 0.400
SubtreeSizes/10k_tx_1024_per_subtree-4 4.804µ 4.861µ ~ 1.000
SubtreeSizes/10k_tx_2k_per_subtree-4 2.457µ 2.404µ ~ 0.100
BlockSizeScaling/10k_tx_64_per_subtree-4 77.27µ 76.32µ ~ 0.100
BlockSizeScaling/10k_tx_256_per_subtree-4 19.40µ 19.28µ ~ 0.400
BlockSizeScaling/10k_tx_1024_per_subtree-4 4.798µ 4.753µ ~ 0.100
BlockSizeScaling/50k_tx_64_per_subtree-4 407.0µ 407.2µ ~ 1.000
BlockSizeScaling/50k_tx_256_per_subtree-4 97.26µ 96.86µ ~ 1.000
BlockSizeScaling/50k_tx_1024_per_subtree-4 24.14µ 23.81µ ~ 0.700
SubtreeAllocations/small_subtrees_exists_check-4 168.0µ 157.9µ ~ 0.100
SubtreeAllocations/small_subtrees_data_fetch-4 175.6µ 173.8µ ~ 0.200
SubtreeAllocations/small_subtrees_full_validation-4 337.7µ 334.7µ ~ 0.400
SubtreeAllocations/medium_subtrees_exists_check-4 9.956µ 9.360µ ~ 0.100
SubtreeAllocations/medium_subtrees_data_fetch-4 10.44µ 10.14µ ~ 0.100
SubtreeAllocations/medium_subtrees_full_validation-4 19.58µ 19.49µ ~ 1.000
SubtreeAllocations/large_subtrees_exists_check-4 2.336µ 2.231µ ~ 0.100
SubtreeAllocations/large_subtrees_data_fetch-4 2.516µ 2.462µ ~ 0.400
SubtreeAllocations/large_subtrees_full_validation-4 4.883µ 4.871µ ~ 1.000
StoreBlock_Sequential/BelowCSVHeight-4 335.9µ 340.6µ ~ 0.200
StoreBlock_Sequential/AboveCSVHeight-4 341.5µ 333.9µ ~ 0.200
GetUtxoHashes-4 272.5n 272.9n ~ 1.000
GetUtxoHashes_ManyOutputs-4 45.14µ 45.01µ ~ 1.000
_NewMetaDataFromBytes-4 215.8n 215.2n ~ 1.000
_Bytes-4 401.3n 399.8n ~ 1.000
_MetaBytes-4 342.6n 339.0n ~ 0.200

Threshold: >10% with p < 0.05 | Generated: 2026-05-19 13:58 UTC

…own path

Two correctness gaps from the Copilot review:

1. Per-partition concurrency across consecutive fetches could lose
   records. PollFetches returns immediately, so a second batch from
   partition P could enter its own goroutine and Mark/append a later
   offset before the previous batch had finished — a subsequent
   commitRecords would advance the partition past records the earlier
   batch had not yet processed, and if that batch later failed on an
   earlier offset the broker never re-delivered them. Add a per-
   partition mutex (lazily allocated via sync.Map) so batches from the
   same partition serialise; different partitions stay parallel.

2. The ErrClientClosed / context.Canceled branch in the error-handling
   path returned directly without partitionWg.Wait + final commitRecords.
   Records processed after the last ticker commit were left
   uncommitted despite successful processing. Factor the cleanup into
   a shutdownDrain closure and call it from both shutdown paths.

Plus fix the inaccurate SetTxMetaCacheMulti doc comment: the method
exists for a future batched fan-out, but the current shard-worker
txmetaHandler applies entries one at a time via
SetTxMetaCacheFromBytes and does NOT call SetTxMetaCacheMulti.
@sonarqubecloud

Copy link
Copy Markdown

@icellan icellan merged commit f1cf28a into bsv-blockchain:main May 19, 2026
25 checks passed
icellan added a commit that referenced this pull request May 19, 2026
Brings in #895 (kafka per-partition fan-out + sticky-partitioner producer
keys). Clean auto-merge — the relevant content was already on this
branch via the earlier forward-port commit 673ecf9, so git recognised
the trees as identical.

Once a future rebase shrinks this branch, 673ecf9 will drop out
naturally since main now carries the same code.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants