Skip to content

feature: add franz as Kafka library#611

Merged
gokutheengineer merged 13 commits into
bsv-blockchain:mainfrom
gokutheengineer:gokhan/new-kafka-lib
Mar 27, 2026
Merged

feature: add franz as Kafka library#611
gokutheengineer merged 13 commits into
bsv-blockchain:mainfrom
gokutheengineer:gokhan/new-kafka-lib

Conversation

@gokutheengineer

Copy link
Copy Markdown
Collaborator

No description provided.

@github-actions

github-actions Bot commented Mar 23, 2026

Copy link
Copy Markdown
Contributor

🤖 Claude Code Review

Status: Complete

Summary

This PR successfully migrates from IBM/sarama to twmb/franz-go as the Kafka client library. The refactor is well-executed with proper abstraction and backward compatibility via in-memory testing support.

Key Changes:

  • Migrated from sarama to franz-go for all producer/consumer operations
  • Refactored in-memory Kafka implementation to remove sarama dependency
  • Updated test files to use new KafkaMessage structure
  • Maintained API compatibility through clean interface design

Findings

[Minor] Leftover Dead Code in kafka_auth.go

The file util/kafka/kafka_auth.go (not modified in this PR) still imports sarama and defines TLS configuration functions (configureKafkaAuth, configureKafkaAuthFromFields) that appear to be dead code - only referenced in their own test file. Since this PR migrates to franz-go, consider removing kafka_auth.go and kafka_auth_test.go in a follow-up. The TLS configuration logic has been reimplemented in buildFranzTLSConfig() in kafka_producer_async.go:462-497.

[Info] Chaos Tests Still Use Sarama

The chaos test suite (test/chaos/*.go) still uses sarama directly. This appears intentional as these tests simulate external clients interacting with Kafka. No action needed unless you want chaos tests to also use franz-go.

History

  • ✅ Fixed: Previously reported issue about using standard library errors instead of teranode errors package (now resolved - all files use github.com/bsv-blockchain/teranode/errors)
  • ✅ Fixed: sarama.ByteEncoder compilation error (resolved in current version)

Comment thread util/kafka/kafka_producer_async.go Outdated
Comment thread util/kafka/in_memory_kafka/in_memory_kafka.go Outdated
@github-actions

github-actions Bot commented Mar 26, 2026

Copy link
Copy Markdown
Contributor

Benchmark Comparison Report

Baseline: main (unknown)

Current: PR-611 (52130bc)

Summary

  • Regressions: 0
  • Improvements: 0
  • Unchanged: 151
  • Significance level: p < 0.05
All benchmark results (sec/op)
Benchmark Baseline Current Change p-value
_NewBlockFromBytes-4 1.391µ 1.399µ ~ 0.400
SplitSyncedParentMap_SetIfNotExists/256_buckets-4 59.17n 59.48n ~ 0.200
SplitSyncedParentMap_SetIfNotExists/16_buckets-4 59.23n 59.30n ~ 0.700
SplitSyncedParentMap_SetIfNotExists/1_bucket-4 59.54n 62.61n ~ 0.200
SplitSyncedParentMap_ConcurrentSetIfNotExists/256_buckets... 32.64n 32.70n ~ 0.500
SplitSyncedParentMap_ConcurrentSetIfNotExists/16_buckets_... 57.44n 58.29n ~ 1.000
SplitSyncedParentMap_ConcurrentSetIfNotExists/1_bucket_pa... 147.0n 151.2n ~ 0.100
MiningCandidate_Stringify_Short-4 255.3n 254.2n ~ 0.400
MiningCandidate_Stringify_Long-4 1.759µ 1.744µ ~ 0.800
MiningSolution_Stringify-4 912.4n 912.6n ~ 1.000
BlockInfo_MarshalJSON-4 1.706µ 1.712µ ~ 0.200
NewFromBytes-4 127.1n 124.8n ~ 0.700
Mine_EasyDifficulty-4 58.43µ 58.32µ ~ 1.000
Mine_WithAddress-4 4.661µ 4.890µ ~ 0.100
BlockAssembler_AddTx-4 0.03037n 0.02639n ~ 0.200
AddNode-4 11.45 11.31 ~ 0.400
AddNodeWithMap-4 11.16 11.36 ~ 1.000
DirectSubtreeAdd/4_per_subtree-4 63.67n 63.83n ~ 0.700
DirectSubtreeAdd/64_per_subtree-4 31.75n 29.78n ~ 0.100
DirectSubtreeAdd/256_per_subtree-4 30.55n 28.77n ~ 0.100
DirectSubtreeAdd/1024_per_subtree-4 29.33n 27.83n ~ 0.100
DirectSubtreeAdd/2048_per_subtree-4 28.97n 27.45n ~ 0.100
SubtreeProcessorAdd/4_per_subtree-4 294.4n 295.9n ~ 1.000
SubtreeProcessorAdd/64_per_subtree-4 301.6n 290.6n ~ 0.100
SubtreeProcessorAdd/256_per_subtree-4 296.4n 290.3n ~ 0.200
SubtreeProcessorAdd/1024_per_subtree-4 294.1n 289.1n ~ 0.100
SubtreeProcessorAdd/2048_per_subtree-4 293.6n 291.6n ~ 0.700
SubtreeProcessorRotate/4_per_subtree-4 300.5n 300.8n ~ 0.300
SubtreeProcessorRotate/64_per_subtree-4 298.1n 299.1n ~ 1.000
SubtreeProcessorRotate/256_per_subtree-4 299.6n 298.2n ~ 0.700
SubtreeProcessorRotate/1024_per_subtree-4 296.0n 297.2n ~ 0.700
SubtreeNodeAddOnly/4_per_subtree-4 62.42n 63.49n ~ 0.200
SubtreeNodeAddOnly/64_per_subtree-4 38.35n 38.69n ~ 0.200
SubtreeNodeAddOnly/256_per_subtree-4 37.16n 38.16n ~ 0.100
SubtreeNodeAddOnly/1024_per_subtree-4 36.55n 36.95n ~ 0.100
SubtreeCreationOnly/4_per_subtree-4 139.9n 142.1n ~ 0.200
SubtreeCreationOnly/64_per_subtree-4 608.4n 603.9n ~ 0.700
SubtreeCreationOnly/256_per_subtree-4 2.160µ 2.129µ ~ 0.100
SubtreeCreationOnly/1024_per_subtree-4 7.674µ 7.515µ ~ 0.200
SubtreeCreationOnly/2048_per_subtree-4 14.63µ 14.17µ ~ 0.100
SubtreeProcessorOverheadBreakdown/64_per_subtree-4 294.4n 294.7n ~ 0.700
SubtreeProcessorOverheadBreakdown/1024_per_subtree-4 291.9n 298.7n ~ 0.100
ParallelGetAndSetIfNotExists/1k_nodes-4 935.2µ 922.8µ ~ 0.700
ParallelGetAndSetIfNotExists/10k_nodes-4 1.814m 1.847m ~ 0.100
ParallelGetAndSetIfNotExists/50k_nodes-4 7.922m 8.058m ~ 0.100
ParallelGetAndSetIfNotExists/100k_nodes-4 15.60m 15.96m ~ 0.100
SequentialGetAndSetIfNotExists/1k_nodes-4 756.8µ 754.6µ ~ 1.000
SequentialGetAndSetIfNotExists/10k_nodes-4 2.859m 2.959m ~ 0.100
SequentialGetAndSetIfNotExists/50k_nodes-4 10.46m 10.98m ~ 0.100
SequentialGetAndSetIfNotExists/100k_nodes-4 20.42m 20.80m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/1k_nodes-4 962.0µ 969.0µ ~ 0.700
ProcessOwnBlockSubtreeNodesParallel/10k_nodes-4 4.582m 4.651m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/100k_nodes-4 18.61m 19.02m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/1k_nodes-4 791.2µ 804.7µ ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/10k_nodes-4 5.868m 5.966m ~ 0.200
ProcessOwnBlockSubtreeNodesSequential/100k_nodes-4 39.75m 40.33m ~ 0.100
DiskTxMap_SetIfNotExists-4 4.134µ 3.830µ ~ 0.100
DiskTxMap_SetIfNotExists_Parallel-4 3.971µ 3.668µ ~ 0.700
DiskTxMap_ExistenceOnly-4 417.2n 406.3n ~ 0.400
Queue-4 209.0n 203.8n ~ 0.200
AtomicPointer-4 4.712n 3.800n ~ 0.100
ReorgOptimizations/DedupFilterPipeline/Old/10K-4 891.3µ 936.9µ ~ 0.100
ReorgOptimizations/DedupFilterPipeline/New/10K-4 863.9µ 886.4µ ~ 0.400
ReorgOptimizations/AllMarkFalse/Old/10K-4 115.2µ 112.8µ ~ 0.400
ReorgOptimizations/AllMarkFalse/New/10K-4 62.68µ 61.72µ ~ 0.100
ReorgOptimizations/HashSlicePool/Old/10K-4 64.00µ 66.36µ ~ 1.000
ReorgOptimizations/HashSlicePool/New/10K-4 11.61µ 11.16µ ~ 0.100
ReorgOptimizations/NodeFlags/Old/10K-4 5.723µ 6.557µ ~ 0.400
ReorgOptimizations/NodeFlags/New/10K-4 1.918µ 2.759µ ~ 0.100
ReorgOptimizations/DedupFilterPipeline/Old/100K-4 12.73m 11.88m ~ 0.700
ReorgOptimizations/DedupFilterPipeline/New/100K-4 12.06m 11.60m ~ 0.200
ReorgOptimizations/AllMarkFalse/Old/100K-4 1.206m 1.146m ~ 0.100
ReorgOptimizations/AllMarkFalse/New/100K-4 687.0µ 678.4µ ~ 0.400
ReorgOptimizations/HashSlicePool/Old/100K-4 642.2µ 690.8µ ~ 0.100
ReorgOptimizations/HashSlicePool/New/100K-4 321.6µ 349.1µ ~ 0.400
ReorgOptimizations/NodeFlags/Old/100K-4 55.61µ 53.68µ ~ 0.400
ReorgOptimizations/NodeFlags/New/100K-4 18.57µ 17.94µ ~ 0.200
TxMapSetIfNotExists-4 51.66n 51.87n ~ 0.300
TxMapSetIfNotExistsDuplicate-4 37.75n 38.01n ~ 0.100
ChannelSendReceive-4 617.5n 606.8n ~ 0.100
CalcBlockWork-4 500.9n 504.4n ~ 0.400
CalculateWork-4 687.4n 684.5n ~ 0.400
BuildBlockLocatorString_Helpers/Size_10-4 1.409µ 1.422µ ~ 0.700
BuildBlockLocatorString_Helpers/Size_100-4 15.22µ 13.71µ ~ 0.100
BuildBlockLocatorString_Helpers/Size_1000-4 133.2µ 130.5µ ~ 0.700
CatchupWithHeaderCache-4 105.2m 105.2m ~ 0.700
_prepareTxsPerLevel-4 430.5m 422.1m ~ 0.700
_prepareTxsPerLevelOrdered-4 4.180m 4.016m ~ 0.700
_prepareTxsPerLevel_Comparison/Original-4 439.4m 430.0m ~ 0.200
_prepareTxsPerLevel_Comparison/Optimized-4 3.960m 3.767m ~ 0.100
SubtreeProcessor/100_tx_64_per_subtree-4 96.23m 94.00m ~ 0.700
SubtreeProcessor/500_tx_64_per_subtree-4 453.9m 453.5m ~ 1.000
SubtreeProcessor/500_tx_256_per_subtree-4 508.5m 475.6m ~ 0.100
SubtreeProcessor/1k_tx_64_per_subtree-4 895.4m 902.0m ~ 0.400
SubtreeProcessor/1k_tx_256_per_subtree-4 935.8m 922.2m ~ 0.200
StreamingProcessorPhases/FilterValidated/100_tx-4 3.265m 3.319m ~ 0.400
StreamingProcessorPhases/ClassifyProcess/100_tx-4 251.6µ 244.7µ ~ 0.100
StreamingProcessorPhases/FilterValidated/500_tx-4 15.49m 15.50m ~ 1.000
StreamingProcessorPhases/ClassifyProcess/500_tx-4 620.8µ 605.2µ ~ 0.100
StreamingProcessorPhases/FilterValidated/1k_tx-4 30.03m 31.34m ~ 0.100
StreamingProcessorPhases/ClassifyProcess/1k_tx-4 1.080m 1.059m ~ 0.100
SubtreeSizes/10k_tx_4_per_subtree-4 1.312m 1.312m ~ 1.000
SubtreeSizes/10k_tx_16_per_subtree-4 321.4µ 325.4µ ~ 0.700
SubtreeSizes/10k_tx_64_per_subtree-4 75.23µ 74.00µ ~ 0.100
SubtreeSizes/10k_tx_256_per_subtree-4 18.55µ 18.73µ ~ 0.200
SubtreeSizes/10k_tx_512_per_subtree-4 9.299µ 9.293µ ~ 1.000
SubtreeSizes/10k_tx_1024_per_subtree-4 4.583µ 4.600µ ~ 0.700
SubtreeSizes/10k_tx_2k_per_subtree-4 2.311µ 2.319µ ~ 0.700
BlockSizeScaling/10k_tx_64_per_subtree-4 72.66µ 74.74µ ~ 0.100
BlockSizeScaling/10k_tx_256_per_subtree-4 18.32µ 18.66µ ~ 0.100
BlockSizeScaling/10k_tx_1024_per_subtree-4 4.596µ 4.693µ ~ 0.400
BlockSizeScaling/50k_tx_64_per_subtree-4 389.5µ 405.8µ ~ 0.700
BlockSizeScaling/50k_tx_256_per_subtree-4 92.15µ 93.84µ ~ 0.200
BlockSizeScaling/50k_tx_1024_per_subtree-4 22.45µ 23.21µ ~ 0.100
SubtreeAllocations/small_subtrees_exists_check-4 156.4µ 159.7µ ~ 0.400
SubtreeAllocations/small_subtrees_data_fetch-4 166.4µ 168.5µ ~ 1.000
SubtreeAllocations/small_subtrees_full_validation-4 320.8µ 324.5µ ~ 0.200
SubtreeAllocations/medium_subtrees_exists_check-4 9.111µ 9.212µ ~ 0.400
SubtreeAllocations/medium_subtrees_data_fetch-4 9.733µ 9.855µ ~ 0.400
SubtreeAllocations/medium_subtrees_full_validation-4 18.48µ 18.95µ ~ 0.100
SubtreeAllocations/large_subtrees_exists_check-4 2.202µ 2.207µ ~ 1.000
SubtreeAllocations/large_subtrees_data_fetch-4 2.362µ 2.403µ ~ 0.100
SubtreeAllocations/large_subtrees_full_validation-4 4.639µ 4.731µ ~ 0.100
_BufferPoolAllocation/16KB-4 4.546µ 3.395µ ~ 0.100
_BufferPoolAllocation/32KB-4 7.842µ 8.819µ ~ 0.700
_BufferPoolAllocation/64KB-4 15.86µ 14.62µ ~ 0.100
_BufferPoolAllocation/128KB-4 32.20µ 28.05µ ~ 0.100
_BufferPoolAllocation/512KB-4 111.1µ 104.2µ ~ 0.100
_BufferPoolConcurrent/32KB-4 18.04µ 18.03µ ~ 1.000
_BufferPoolConcurrent/64KB-4 26.25µ 28.37µ ~ 0.100
_BufferPoolConcurrent/512KB-4 140.8µ 144.8µ ~ 0.700
_SubtreeDeserializationWithBufferSizes/16KB-4 640.7µ 660.7µ ~ 0.200
_SubtreeDeserializationWithBufferSizes/32KB-4 647.7µ 632.4µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/64KB-4 647.4µ 634.4µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/128KB-4 636.7µ 638.3µ ~ 0.700
_SubtreeDeserializationWithBufferSizes/512KB-4 635.2µ 642.6µ ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/16KB-4 35.99m 35.96m ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/32KB-4 36.04m 36.32m ~ 0.700
_SubtreeDataDeserializationWithBufferSizes/64KB-4 35.95m 35.93m ~ 0.400
_SubtreeDataDeserializationWithBufferSizes/128KB-4 35.93m 35.76m ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/512KB-4 35.32m 36.16m ~ 0.100
_PooledVsNonPooled/Pooled-4 841.4n 829.3n ~ 0.100
_PooledVsNonPooled/NonPooled-4 7.481µ 6.917µ ~ 0.100
_MemoryFootprint/Current_512KB_32concurrent-4 8.150µ 6.945µ ~ 0.100
_MemoryFootprint/Proposed_32KB_32concurrent-4 11.319µ 9.409µ ~ 0.100
_MemoryFootprint/Alternative_64KB_32concurrent-4 10.624µ 9.105µ ~ 0.100
GetUtxoHashes-4 250.9n 257.0n ~ 0.200
GetUtxoHashes_ManyOutputs-4 42.00µ 42.37µ ~ 0.100
_NewMetaDataFromBytes-4 236.1n 236.5n ~ 0.100
_Bytes-4 617.0n 620.4n ~ 1.000
_MetaBytes-4 555.0n 564.2n ~ 0.700

Threshold: >10% with p < 0.05 | Generated: 2026-03-26 13:48 UTC

@gokutheengineer gokutheengineer requested a review from icellan March 26, 2026 13:41
@sonarqubecloud

Copy link
Copy Markdown

Quality Gate Failed Quality Gate failed

Failed conditions
49.4% Coverage on New Code (required ≥ 80%)
6.0% Duplication on New Code (required ≤ 3%)

See analysis details on SonarQube Cloud

@icellan icellan requested a review from freemans13 March 26, 2026 13:49

@freemans13 freemans13 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't say I'm familiar with Franz but everything makes sense. Lets do this!

@gokutheengineer gokutheengineer merged commit e3bfc0b into bsv-blockchain:main Mar 27, 2026
24 checks passed
oskarszoon added a commit to oskarszoon/teranode that referenced this pull request Apr 2, 2026
…GE_TOO_LARGE

The Sarama → franz-go migration (PR bsv-blockchain#611) mapped flush_bytes directly to
ProducerBatchMaxBytes. In Sarama, flush_bytes was a flush trigger threshold
("flush after N bytes accumulate"). In franz-go, ProducerBatchMaxBytes is a
hard limit on batch size.

With flush_bytes=64 (used by blocks-final, blocks), clampBatchMaxBytes set
ProducerBatchMaxBytes to 512 bytes. Redpanda rejects any message exceeding
this with MESSAGE_TOO_LARGE, silently dropping all blocks-final notifications.

This caused block 942978 to never propagate to SVNodes on a mainnet instance,
along with ~192K failed txmeta and legacy-inv messages in 2 days.

Fix: clampBatchMaxBytes now returns the franz-go default of 1 MiB for any
flush_bytes value <= 1 MiB (all existing configs). Only explicit values above
1 MiB are treated as batch size overrides. This matches the old Sarama
behavior where Producer.MaxMessageBytes defaulted to 1 MB independently of
Flush.Bytes.
icellan added a commit that referenced this pull request May 8, 2026
…ines

Production observation on dev-scale-2: subtree-validator's "Tx Meta read
from Kafka /second" Grafana panel oscillates between 0 and 2.4M tx/s
with mean ~1.5M, repeatedly dropping to zero. One month ago the same
metric was a steady ~900K/s with no zero-drops. Cache hit rate sits
at ~50% even for new (not-yet-mined) transactions, and subtree
validation takes 30-49s per subtree (target: ~2s).

Root cause is in services/subtreevalidation/txmetaHandler.go:

- Each Kafka message spawns a goroutine via `go func()` (unbounded
  fan-out, since 9f4f1e5 in Dec 2025).
- Each Kafka message carries a binary BATCH of N entries (since
  4dcf264 in Dec 2025), but the handler loops calling
  SetCacheFromBytes once per entry sequentially.
- Each SetCacheFromBytes call takes a per-bucket write lock in
  improved_cache.bucket.Set (improved_cache.go:799).

Under burst load the cumulative effect is thousands of goroutines
serializing through the cache's bucket locks. The cache's own
sharded-bucket parallelism (8192 buckets in SetMulti) is wasted
because each goroutine takes locks one at a time. Throughput
collapses to single-writer speed, recovers when contention drains,
collapses again — the visible 0/2.4M oscillation pattern.

The franz Kafka library switch in #611 (2026-03-27) is the most
likely trigger for the recent regression, but the underlying design
flaw predates that.

Fix:

- New txmetaCacheJob struct: parsed Kafka batch (deep-copied keys/
  values, separated ADD/DELETE).
- Bounded worker pool consumes parsed jobs from a buffered channel.
  Workers call SetCacheMulti ONCE per batch — letting the cache's
  bucket fan-out parallelize internally, taking each touched bucket
  lock once per Kafka message instead of once per entry.
- Handler now does cheap parsing on the Kafka consumer goroutine,
  then enqueues onto the work channel. Channel-full enqueue BLOCKS,
  applying backpressure to Kafka rather than letting goroutines
  pile up unboundedly.
- Shutdown is driven by closing the work channel (not ctx cancel)
  so workers drain queued items before exiting. sync.Once on the
  close keeps Stop() idempotent for tests with deferred cleanup.

Settings:
- subtreevalidation_txmetaCacheKafkaWorkers (default 8)
- subtreevalidation_txmetaCacheKafkaQueueSize (default 256)

Interface:
- SetCacheMulti added to txMetaCacheOps. TxMetaCache already has it.

Tests:
- Existing TestServer_txmetaHandler covers nil/short/ADD/DELETE
  paths; updated to assert SetCacheMulti (not SetCacheFromBytes)
  on ADD and run against the worker pool.
- New TestServer_txmetaHandler_BatchesIntoSingleSetCacheMulti
  guards the regression: 50 entries -> ONE SetCacheMulti call.
- New TestParseTxmetaBatch covers empty/zero-entry/multi-entry/
  truncated batches and verifies keys/values are deep-copied so
  later mutation of the source buffer doesn't corrupt parsed jobs.
- Race detector clean.

Note: this fix does NOT remove the cache writeback in
TxMetaCache.BatchDecorate (cache-aside on store fetches) — that
path is needed for nodes whose Kafka publishers don't carry every
transaction.

Verified: go build, go vet, go test -race for the changed packages
all green; pre-commit hooks pass.
icellan added a commit that referenced this pull request May 8, 2026
…ines

Production observation on dev-scale-2: subtree-validator's "Tx Meta read
from Kafka /second" Grafana panel oscillates between 0 and 2.4M tx/s
with mean ~1.5M, repeatedly dropping to zero. One month ago the same
metric was a steady ~900K/s with no zero-drops. Cache hit rate sits
at ~50% even for new (not-yet-mined) transactions, and subtree
validation takes 30-49s per subtree (target: ~2s).

Root cause is in services/subtreevalidation/txmetaHandler.go:

- Each Kafka message spawns a goroutine via `go func()` (unbounded
  fan-out, since 9f4f1e5 in Dec 2025).
- Each Kafka message carries a binary BATCH of N entries (since
  4dcf264 in Dec 2025), but the handler loops calling
  SetCacheFromBytes once per entry sequentially.
- Each SetCacheFromBytes call takes a per-bucket write lock in
  improved_cache.bucket.Set (improved_cache.go:799).

Under burst load the cumulative effect is thousands of goroutines
serializing through the cache's bucket locks. The cache's own
sharded-bucket parallelism (8192 buckets in SetMulti) is wasted
because each goroutine takes locks one at a time. Throughput
collapses to single-writer speed, recovers when contention drains,
collapses again — the visible 0/2.4M oscillation pattern.

The franz Kafka library switch in #611 (2026-03-27) is the most
likely trigger for the recent regression, but the underlying design
flaw predates that.

Fix:

- New txmetaCacheJob struct: parsed Kafka batch (deep-copied keys/
  values, separated ADD/DELETE).
- Bounded worker pool consumes parsed jobs from a buffered channel.
  Workers call SetCacheMulti ONCE per batch — letting the cache's
  bucket fan-out parallelize internally, taking each touched bucket
  lock once per Kafka message instead of once per entry.
- Handler now does cheap parsing on the Kafka consumer goroutine,
  then enqueues onto the work channel. Channel-full enqueue BLOCKS,
  applying backpressure to Kafka rather than letting goroutines
  pile up unboundedly.
- Shutdown is driven by closing the work channel (not ctx cancel)
  so workers drain queued items before exiting. sync.Once on the
  close keeps Stop() idempotent for tests with deferred cleanup.

Settings:
- subtreevalidation_txmetaCacheKafkaWorkers (default 8)
- subtreevalidation_txmetaCacheKafkaQueueSize (default 256)

Interface:
- SetCacheMulti added to txMetaCacheOps. TxMetaCache already has it.

Tests:
- Existing TestServer_txmetaHandler covers nil/short/ADD/DELETE
  paths; updated to assert SetCacheMulti (not SetCacheFromBytes)
  on ADD and run against the worker pool.
- New TestServer_txmetaHandler_BatchesIntoSingleSetCacheMulti
  guards the regression: 50 entries -> ONE SetCacheMulti call.
- New TestParseTxmetaBatch covers empty/zero-entry/multi-entry/
  truncated batches and verifies keys/values are deep-copied so
  later mutation of the source buffer doesn't corrupt parsed jobs.
- Race detector clean.

Note: this fix does NOT remove the cache writeback in
TxMetaCache.BatchDecorate (cache-aside on store fetches) — that
path is needed for nodes whose Kafka publishers don't carry every
transaction.

Verified: go build, go vet, go test -race for the changed packages
all green; pre-commit hooks pass.
icellan added a commit that referenced this pull request May 8, 2026
Restores the partition-level consume parallelism that the franz-go
switch (#611, 2026-03-27) silently removed, and fixes the producer-side
sticky-partitioner skew that produces bursty per-partition load.

Three changes, all on the txmeta hot path:

1) services/validator/Validator.go — Kafka producer now sets Key to a
   tx hash (the first hash in the batch for batched sends, the txn's
   own hash for single sends). Previously Key was nil, which under
   franz-go's default StickyKeyPartitioner is equivalent to a
   StickyPartitioner: every batch lands on the same partition until
   the linger/batch threshold trips, producing the bursty oscillation
   we observed in production.

   With a non-nil key, StickyKeyPartitioner hashes the key onto a
   partition deterministically. tx hashes are uniformly distributed,
   so partition usage is now uniform.

2) util/kafka/kafka_consumer.go (Start) — replaces the single-goroutine
   fetches.EachRecord(...) loop with fetches.EachPartition(...) +
   one goroutine per partition per fetch. Within a partition records
   are still processed sequentially so per-partition order is
   preserved; across partitions the work runs in parallel. partitionWg
   is awaited before the next PollFetches so in-flight goroutine count
   is bounded by partition count.

   This restores N-way concurrency that the previous Kafka library
   provided implicitly via per-partition consumer goroutines.

3) util/kafka/kafka_consumer.go (NewKafkaConsumerGroup) — when
   AutoCommitEnabled is true, registers kgo.AutoCommitMarks() and the
   per-partition loop calls client.MarkCommitRecords(record) only
   after consumerFn returns nil. With the previous default auto-commit
   any record that consumerFn errored on was still being committed by
   the auto-commit timer because franz-go tracks "iterated" not
   "succeeded". This change makes auto-commit consistent with the
   manual-commit path's semantics.

Verified:
- go build / go vet across changed packages — clean
- go test -short -race ./util/kafka/... — pass (perf tests are
  -short-skipped; they require a real broker)
- go test -race ./services/subtreevalidation/{TestServer_txmetaHandler,
  TestParseTxmetaBatch} — pass (worker pool from #834 still works
  under the new parallel feeder)
- Pre-commit hooks (gofmt / gci / golangci-lint) — pass

Notes:
- Producer compression intentionally NOT enabled — Bitcoin tx data
  is high-entropy, compression would just burn CPU.
- AutoCommitMarks closes the gap where errored records were still
  committed; it does NOT close the gap where async handlers (txmeta
  worker pool from #834) return before the cache write completes.
  Per discussion this is acceptable: cache misses fall through to
  the UTXO store transparently.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants