fix(kafka): per-partition fan-out + sticky-partitioner producer keys#895
Conversation
Consumer: pull loop dispatches each fetch's per-partition records to a goroutine without a between-fetch barrier (no partitionWg.Wait between PollFetches), so franz-go's local buffer keeps draining while one slow partition is in flight. Goroutines drain into a shared uncommittedRecords slice under a mutex. Shutdown waits on a single partitionWg.Wait so the final commitRecords captures every in-flight record. AutoCommitEnabled uses MarkCommitRecords (lock- internal in franz-go, safe from many goroutines). Producer: sendTxMetaToKafka / sendTxMetaBatch now set the message Key to the (first) tx hash. With franz-go's default StickyKeyPartitioner this hashes onto a single partition deterministically, distributing traffic evenly across partitions (tx hashes are uniform) and keeping every record from one batch on the same partition. Nil keys previously degraded the StickyKeyPartitioner to a StickyPartitioner which bunched batches until linger expired. txmetaHandler: shards Kafka ADD/DELETE entries by hash byte across 256 worker goroutines with bounded per-shard queues. Preserves per-key ordering. Switches from blocking-on-full to drop-on-full once any partition reaches its tail (txmetaCaughtUp latch is one-way). Adds SetCacheMulti to the txMetaCacheOps interface and a SetTxMetaCacheMulti server method for future batch-cache fan-out. Surfaced as a follow-up split out of PR bsv-blockchain#828; reviewable on its own.
|
🤖 Claude Code Review Status: Complete All previously reported issues have been addressed in commit fe137a4. Current Review:
History:
|
There was a problem hiding this comment.
Pull request overview
This PR improves Kafka txmeta throughput by parallelizing consumer partition handling, using deterministic producer keys for partition distribution, and preparing a batched txmeta cache API.
Changes:
- Updates franz-go consumer setup and per-partition fetch processing.
- Sets txmeta Kafka producer keys from transaction hashes.
- Adds documentation for txmeta sharded handling and introduces
SetTxMetaCacheMulti.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
util/kafka/kafka_consumer.go |
Adds AutoCommitMarks and per-partition goroutine fan-out in the Kafka consumer loop. |
services/validator/Validator.go |
Uses tx hashes as Kafka message keys for txmeta publishing. |
services/subtreevalidation/txmetaHandler.go |
Expands txmeta handler comments describing sharding, queueing, and error semantics. |
services/subtreevalidation/SubtreeValidation.go |
Extends txmeta cache interface/server methods with a multi-set cache API. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-05-19 13:58 UTC |
…own path Two correctness gaps from the Copilot review: 1. Per-partition concurrency across consecutive fetches could lose records. PollFetches returns immediately, so a second batch from partition P could enter its own goroutine and Mark/append a later offset before the previous batch had finished — a subsequent commitRecords would advance the partition past records the earlier batch had not yet processed, and if that batch later failed on an earlier offset the broker never re-delivered them. Add a per- partition mutex (lazily allocated via sync.Map) so batches from the same partition serialise; different partitions stay parallel. 2. The ErrClientClosed / context.Canceled branch in the error-handling path returned directly without partitionWg.Wait + final commitRecords. Records processed after the last ticker commit were left uncommitted despite successful processing. Factor the cleanup into a shutdownDrain closure and call it from both shutdown paths. Plus fix the inaccurate SetTxMetaCacheMulti doc comment: the method exists for a future batched fan-out, but the current shard-worker txmetaHandler applies entries one at a time via SetTxMetaCacheFromBytes and does NOT call SetTxMetaCacheMulti.
|
Brings in #895 (kafka per-partition fan-out + sticky-partitioner producer keys). Clean auto-merge — the relevant content was already on this branch via the earlier forward-port commit 673ecf9, so git recognised the trees as identical. Once a future rebase shrinks this branch, 673ecf9 will drop out naturally since main now carries the same code.



Split out of #828 so it can be reviewed independently.
Consumer (util/kafka/kafka_consumer.go)
partitionWg.WaitbetweenPollFetches). franz-go's local buffer keeps draining while one slow partition is in flight.uncommittedRecordsslice under a mutex.partitionWg.Waitso the finalcommitRecordscaptures every record that completed processing — fixes a window where a goroutine mid-consumerFnappended after the puller had committed and returned.AutoCommitEnabledusesMarkCommitRecords(lock-internal in franz-go, safe from many goroutines concurrently).HighWaterMark(added upstream in PR fix(subtreevalidation): two-mode txmeta enqueue (block during backfill, drop+warn live) #891) into eachKafkaMessageso handlers can detect "caught up to live tail."Producer (services/validator/Validator.go)
sendTxMetaToKafka/sendTxMetaBatchset the messageKeyto the (first) tx hash. With franz-go's defaultStickyKeyPartitionerthis hashes onto a single partition deterministically:StickyKeyPartitionerto aStickyPartitioner, bunching consecutive batches onto the same partition until linger expired. Symptom on the consumer side was bursty per-partition read throughput.txmetaHandler (services/subtreevalidation/txmetaHandler.go)
txmetaCaughtUp): blocking-on-full during startup (cold cache rebuild), drop-on-full once any assigned partition reaches its tail.Cache interface (services/subtreevalidation/SubtreeValidation.go)
SetCacheMulti(keys, values [][]byte) errortotxMetaCacheOpsand a correspondingSetTxMetaCacheMultiserver method. Not yet called from the handler — kept for a future batched fan-out optimisation.Test plan
go build ./...cleango vet ./util/kafka/... ./services/validator/... ./services/subtreevalidation/...cleanmake lint-newclean