feature: add franz as Kafka library#611
Conversation
|
🤖 Claude Code Review Status: Complete SummaryThis PR successfully migrates from IBM/sarama to twmb/franz-go as the Kafka client library. The refactor is well-executed with proper abstraction and backward compatibility via in-memory testing support. Key Changes:
Findings[Minor] Leftover Dead Code in kafka_auth.go The file util/kafka/kafka_auth.go (not modified in this PR) still imports sarama and defines TLS configuration functions (configureKafkaAuth, configureKafkaAuthFromFields) that appear to be dead code - only referenced in their own test file. Since this PR migrates to franz-go, consider removing kafka_auth.go and kafka_auth_test.go in a follow-up. The TLS configuration logic has been reimplemented in buildFranzTLSConfig() in kafka_producer_async.go:462-497. [Info] Chaos Tests Still Use Sarama The chaos test suite (test/chaos/*.go) still uses sarama directly. This appears intentional as these tests simulate external clients interacting with Kafka. No action needed unless you want chaos tests to also use franz-go. History
|
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-03-26 13:48 UTC |
|
freemans13
left a comment
There was a problem hiding this comment.
I can't say I'm familiar with Franz but everything makes sense. Lets do this!
…GE_TOO_LARGE The Sarama → franz-go migration (PR bsv-blockchain#611) mapped flush_bytes directly to ProducerBatchMaxBytes. In Sarama, flush_bytes was a flush trigger threshold ("flush after N bytes accumulate"). In franz-go, ProducerBatchMaxBytes is a hard limit on batch size. With flush_bytes=64 (used by blocks-final, blocks), clampBatchMaxBytes set ProducerBatchMaxBytes to 512 bytes. Redpanda rejects any message exceeding this with MESSAGE_TOO_LARGE, silently dropping all blocks-final notifications. This caused block 942978 to never propagate to SVNodes on a mainnet instance, along with ~192K failed txmeta and legacy-inv messages in 2 days. Fix: clampBatchMaxBytes now returns the franz-go default of 1 MiB for any flush_bytes value <= 1 MiB (all existing configs). Only explicit values above 1 MiB are treated as batch size overrides. This matches the old Sarama behavior where Producer.MaxMessageBytes defaulted to 1 MB independently of Flush.Bytes.
…ines Production observation on dev-scale-2: subtree-validator's "Tx Meta read from Kafka /second" Grafana panel oscillates between 0 and 2.4M tx/s with mean ~1.5M, repeatedly dropping to zero. One month ago the same metric was a steady ~900K/s with no zero-drops. Cache hit rate sits at ~50% even for new (not-yet-mined) transactions, and subtree validation takes 30-49s per subtree (target: ~2s). Root cause is in services/subtreevalidation/txmetaHandler.go: - Each Kafka message spawns a goroutine via `go func()` (unbounded fan-out, since 9f4f1e5 in Dec 2025). - Each Kafka message carries a binary BATCH of N entries (since 4dcf264 in Dec 2025), but the handler loops calling SetCacheFromBytes once per entry sequentially. - Each SetCacheFromBytes call takes a per-bucket write lock in improved_cache.bucket.Set (improved_cache.go:799). Under burst load the cumulative effect is thousands of goroutines serializing through the cache's bucket locks. The cache's own sharded-bucket parallelism (8192 buckets in SetMulti) is wasted because each goroutine takes locks one at a time. Throughput collapses to single-writer speed, recovers when contention drains, collapses again — the visible 0/2.4M oscillation pattern. The franz Kafka library switch in #611 (2026-03-27) is the most likely trigger for the recent regression, but the underlying design flaw predates that. Fix: - New txmetaCacheJob struct: parsed Kafka batch (deep-copied keys/ values, separated ADD/DELETE). - Bounded worker pool consumes parsed jobs from a buffered channel. Workers call SetCacheMulti ONCE per batch — letting the cache's bucket fan-out parallelize internally, taking each touched bucket lock once per Kafka message instead of once per entry. - Handler now does cheap parsing on the Kafka consumer goroutine, then enqueues onto the work channel. Channel-full enqueue BLOCKS, applying backpressure to Kafka rather than letting goroutines pile up unboundedly. - Shutdown is driven by closing the work channel (not ctx cancel) so workers drain queued items before exiting. sync.Once on the close keeps Stop() idempotent for tests with deferred cleanup. Settings: - subtreevalidation_txmetaCacheKafkaWorkers (default 8) - subtreevalidation_txmetaCacheKafkaQueueSize (default 256) Interface: - SetCacheMulti added to txMetaCacheOps. TxMetaCache already has it. Tests: - Existing TestServer_txmetaHandler covers nil/short/ADD/DELETE paths; updated to assert SetCacheMulti (not SetCacheFromBytes) on ADD and run against the worker pool. - New TestServer_txmetaHandler_BatchesIntoSingleSetCacheMulti guards the regression: 50 entries -> ONE SetCacheMulti call. - New TestParseTxmetaBatch covers empty/zero-entry/multi-entry/ truncated batches and verifies keys/values are deep-copied so later mutation of the source buffer doesn't corrupt parsed jobs. - Race detector clean. Note: this fix does NOT remove the cache writeback in TxMetaCache.BatchDecorate (cache-aside on store fetches) — that path is needed for nodes whose Kafka publishers don't carry every transaction. Verified: go build, go vet, go test -race for the changed packages all green; pre-commit hooks pass.
…ines Production observation on dev-scale-2: subtree-validator's "Tx Meta read from Kafka /second" Grafana panel oscillates between 0 and 2.4M tx/s with mean ~1.5M, repeatedly dropping to zero. One month ago the same metric was a steady ~900K/s with no zero-drops. Cache hit rate sits at ~50% even for new (not-yet-mined) transactions, and subtree validation takes 30-49s per subtree (target: ~2s). Root cause is in services/subtreevalidation/txmetaHandler.go: - Each Kafka message spawns a goroutine via `go func()` (unbounded fan-out, since 9f4f1e5 in Dec 2025). - Each Kafka message carries a binary BATCH of N entries (since 4dcf264 in Dec 2025), but the handler loops calling SetCacheFromBytes once per entry sequentially. - Each SetCacheFromBytes call takes a per-bucket write lock in improved_cache.bucket.Set (improved_cache.go:799). Under burst load the cumulative effect is thousands of goroutines serializing through the cache's bucket locks. The cache's own sharded-bucket parallelism (8192 buckets in SetMulti) is wasted because each goroutine takes locks one at a time. Throughput collapses to single-writer speed, recovers when contention drains, collapses again — the visible 0/2.4M oscillation pattern. The franz Kafka library switch in #611 (2026-03-27) is the most likely trigger for the recent regression, but the underlying design flaw predates that. Fix: - New txmetaCacheJob struct: parsed Kafka batch (deep-copied keys/ values, separated ADD/DELETE). - Bounded worker pool consumes parsed jobs from a buffered channel. Workers call SetCacheMulti ONCE per batch — letting the cache's bucket fan-out parallelize internally, taking each touched bucket lock once per Kafka message instead of once per entry. - Handler now does cheap parsing on the Kafka consumer goroutine, then enqueues onto the work channel. Channel-full enqueue BLOCKS, applying backpressure to Kafka rather than letting goroutines pile up unboundedly. - Shutdown is driven by closing the work channel (not ctx cancel) so workers drain queued items before exiting. sync.Once on the close keeps Stop() idempotent for tests with deferred cleanup. Settings: - subtreevalidation_txmetaCacheKafkaWorkers (default 8) - subtreevalidation_txmetaCacheKafkaQueueSize (default 256) Interface: - SetCacheMulti added to txMetaCacheOps. TxMetaCache already has it. Tests: - Existing TestServer_txmetaHandler covers nil/short/ADD/DELETE paths; updated to assert SetCacheMulti (not SetCacheFromBytes) on ADD and run against the worker pool. - New TestServer_txmetaHandler_BatchesIntoSingleSetCacheMulti guards the regression: 50 entries -> ONE SetCacheMulti call. - New TestParseTxmetaBatch covers empty/zero-entry/multi-entry/ truncated batches and verifies keys/values are deep-copied so later mutation of the source buffer doesn't corrupt parsed jobs. - Race detector clean. Note: this fix does NOT remove the cache writeback in TxMetaCache.BatchDecorate (cache-aside on store fetches) — that path is needed for nodes whose Kafka publishers don't carry every transaction. Verified: go build, go vet, go test -race for the changed packages all green; pre-commit hooks pass.
Restores the partition-level consume parallelism that the franz-go switch (#611, 2026-03-27) silently removed, and fixes the producer-side sticky-partitioner skew that produces bursty per-partition load. Three changes, all on the txmeta hot path: 1) services/validator/Validator.go — Kafka producer now sets Key to a tx hash (the first hash in the batch for batched sends, the txn's own hash for single sends). Previously Key was nil, which under franz-go's default StickyKeyPartitioner is equivalent to a StickyPartitioner: every batch lands on the same partition until the linger/batch threshold trips, producing the bursty oscillation we observed in production. With a non-nil key, StickyKeyPartitioner hashes the key onto a partition deterministically. tx hashes are uniformly distributed, so partition usage is now uniform. 2) util/kafka/kafka_consumer.go (Start) — replaces the single-goroutine fetches.EachRecord(...) loop with fetches.EachPartition(...) + one goroutine per partition per fetch. Within a partition records are still processed sequentially so per-partition order is preserved; across partitions the work runs in parallel. partitionWg is awaited before the next PollFetches so in-flight goroutine count is bounded by partition count. This restores N-way concurrency that the previous Kafka library provided implicitly via per-partition consumer goroutines. 3) util/kafka/kafka_consumer.go (NewKafkaConsumerGroup) — when AutoCommitEnabled is true, registers kgo.AutoCommitMarks() and the per-partition loop calls client.MarkCommitRecords(record) only after consumerFn returns nil. With the previous default auto-commit any record that consumerFn errored on was still being committed by the auto-commit timer because franz-go tracks "iterated" not "succeeded". This change makes auto-commit consistent with the manual-commit path's semantics. Verified: - go build / go vet across changed packages — clean - go test -short -race ./util/kafka/... — pass (perf tests are -short-skipped; they require a real broker) - go test -race ./services/subtreevalidation/{TestServer_txmetaHandler, TestParseTxmetaBatch} — pass (worker pool from #834 still works under the new parallel feeder) - Pre-commit hooks (gofmt / gci / golangci-lint) — pass Notes: - Producer compression intentionally NOT enabled — Bitcoin tx data is high-entropy, compression would just burn CPU. - AutoCommitMarks closes the gap where errored records were still committed; it does NOT close the gap where async handlers (txmeta worker pool from #834) return before the cache write completes. Per discussion this is acceptable: cache misses fall through to the UTXO store transparently.


No description provided.