perf(txmeta): v2 wire format + partition-aware receiver + pooled buffers#912
Conversation
End-to-end optimisation of the txmeta path between the validator (producer)
and the subtree-validation receiver. Bench delivers 11.1M tx/s through the
full producer→receiver pipeline at batch=1000 with 32 partitions on M3 Max
against a real production-config Native cache, up from 749K-1.14M on the
shard-fanout baseline.
What's in this change:
1. Wire format v2 (services/validator/Validator.go,
services/subtreevalidation/txmetaHandler.go). Backwards-compatible
alongside v1; receiver auto-detects via magic byte 0xFF.
[1 byte] magic = 0xFF
[1 byte] version = 0x02
[2 bytes] reserved
[4 bytes] entry count
per entry:
[8 bytes] xxhash(tx hash)
[32 bytes] tx hash
[1 byte] action (0=ADD, 1=DELETE)
[4 bytes] content length
[N bytes] content
Producer groups items by partition using
bucketIdx = xxhash(hash) % BucketsCount
bucketsPerPartition = BucketsCount / NumPartitions
partition = bucketIdx / bucketsPerPartition
so each Kafka partition's records map to a disjoint contiguous range
of receiver cache buckets. Concurrent per-partition receiver writers
touch disjoint buckets and run lock-free relative to each other.
Producer setting validator_txmeta_wireFormat (v1|v2, default v1) and
validator_txmeta_numPartitions (default 32, must divide BucketsCount).
2. Receiver rewrite (txmetaHandler.go). Drops the 256-way shard
fan-out + worker pool + caught-up latch. Parses v1 and v2 inline,
dispatches all ADDs in one SetCacheMultiSequentialWithHashes call
(or SetCacheMultiSequential on v1) per Kafka message, DELETEs
inline. Parallelism comes from the per-partition Kafka consumer
goroutines (one per partition per fetch from util/kafka/
kafka_consumer.go), not from internal fan-out.
3. Cache: SetMultiSequential family (stores/txmetacache/
improved_cache.go + txmetacache.go). Partition-aware twin of
SetMulti — same per-bucket grouping but no errgroup fan-out. The
With-Hashes variant takes caller-supplied xxhash values so the
receiver passes the on-wire v2 hash straight through.
4. Cache: inner-shard collapse (improved_cache.go). bucketNative
and bucketTrimmed maps were constructed with
NewNativeSplitLockFreeMapUint64(cap, BucketsCount), meaning every
bucket map had 8192 inner sub-shards. 8192 outer * 8192 inner ~
67M sub-maps per cache instance. cleanLockedMap rebuilt the
inner-sharded map on every chunk wrap; that pattern dominated CPU
(24% in NewNativeSplitLockFreeMapUint64) and heap (34.5 GB / 64%
of bench allocations; 80% of in-use memory on the scaling-2 prod
pod).
Outer ImprovedCache already shards across BucketsCount outer
buckets via h%BucketsCount, and b.mu serialises writes to one
bucket. Inner shard count is now 1. deleteFromNativeSplitMapShard
and deleteFromSwissSplitMapShard updated in lockstep — they
previously routed via h%BucketsCount, which no longer matches the
map's internal hash%nrOfBuckets dispatch with nrOfBuckets=1.
Eviction behaviour unchanged: the cache's gen+idx-based rule
operates on map values regardless of inner shard count; IterAll
visits every entry the same way.
5. TxMetaCacheBucketType setting (settings/
subtreevalidation_settings.go, services/subtreevalidation/
Server.go). Bucket type was hard-coded to Unallocated; now
selectable via subtreevalidation_txMetaCacheBucketType
(unallocated|preallocated|trimmed|native, default unallocated).
Unknown values fall back to Unallocated with a Warn log so a typo
never silently changes the allocator. Production default is
unchanged; canary one pod on "native" with production
TxMetaCacheMaxMB before rolling out.
Also exposes subtreevalidation_txMetaCacheTrimRatio as a typed
setting (currently only affects Preallocated; gocore key was
already read inside the cache constructor).
6. Pooled producer + receiver buffers (validator.go,
txmetaHandler.go). txmetaParseBuffers (receiver) pools
keys/values/hashes/deletes slice headers plus single-arena
keysBuf and contentBuf so per-entry hash/content allocations
collapse to one append into a pre-sized buffer.
txmetaPartitionsScratch (validator) pools the partitions outer
slice + per-partition inner slices for sendTxMetaBatchV2. The
byte buffer published to franz-go is NOT pooled: no Publish
callback hook yet for safe return.
7. Kafka producer manual partitioning (util/kafka/
kafka_producer_async.go). New Message.Partition field +
KafkaProducerConfig.ManualPartitioning. When true, registers
kgo.ManualPartitioner and flushBuffered honours
record.Partition. Off by default — existing
StickyKeyPartitioner behaviour preserved. New
WithManualPartitioning ProducerOption; daemon/daemon_kafka.go
passes it for the txmeta producer when wireFormat=v2.
8. In-memory broker test surface (util/kafka/in_memory_kafka/
in_memory_kafka.go). Adds InMemoryBroker.HasConsumer and
DropTopic so e2e tests can wait for consumer-group registration
(the broker drops messages produced to a topic with no
consumers) and release retained-messages state at teardown.
TruncateTopic also added for long bench runs that would
otherwise pin gigabytes in the broker's per-topic message slice.
9. Tests (Validator_test.go, txmetaHandler_test.go,
Server_coverage_test.go, txmeta_e2e_test.go). Unit coverage for
v2 serialization (byte layout), partition routing rule,
v2-format receiver parse, mixed ADD/DELETE handling, bucket
type resolver, end-to-end pipeline via the in-memory broker.
Bench numbers (M3 Max, 32 partitions, 256-byte payload, real
production-config Native cache, sync.Once-shared across bench
invocations):
BenchmarkTxmetaProducerReceiver_V2_Batch100 3.96M tx/s 33 allocs/op
BenchmarkTxmetaProducerReceiver_V2_Batch1000 11.10M tx/s 70 allocs/op
Bench is now scheduler-bound (~60% pthread_cond_wait,
~20% pthread_cond_signal). Cache writes, parse, and serialize
allocations no longer appear in the top consumers.
Operational notes for production rollout:
- Default behaviour unchanged. Existing deploys keep
bucketUnallocated, wireFormat=v1, no manual partitioning. New
setting opt-in.
- To benefit from #1+#2 in prod: set validator_txmeta_wireFormat=v2
and validator_txmeta_numPartitions to match the actual txmeta
topic partition count.
- To benefit from #4 in prod: set
subtreevalidation_txMetaCacheBucketType=native. Canary first;
the deployed pod's bucketUnallocated path is unaffected.
- TxMetaCacheMaxMB=73728, txMetaCacheTrimRatio=5 settings are
honoured as today.
|
🤖 Claude Code Review Status: Complete This is a well-engineered performance optimization PR targeting 10M+ tx/s throughput for the txmeta cache pipeline. The changes are substantial but carefully structured with strong test coverage and backward compatibility. Architecture Changes:
Code Quality:
Safety & Rollout:
Performance Results:
No critical issues found. The implementation follows project conventions in CLAUDE.md and includes proper error handling, input validation, and defensive programming practices. |
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-05-20 13:45 UTC |
…ers (#912) Cherry-pick of PR #912 onto this branch. This carries the txmeta hot-path work that was developed on top of #828 and shipped as a separate PR against main; landing it here keeps #828's branch in sync so the merged form is internally consistent and a no-op vs main once #912 is merged upstream. End-to-end optimisation of the txmeta path between the validator (producer) and the subtree-validation receiver. Bench delivers 11.1M tx/s through the full producer→receiver pipeline at batch=1000 with 32 partitions on M3 Max against a real production-config Native cache, up from 749K-1.14M on the shard-fanout baseline. What's in this change: 1. Wire format v2 (services/validator/Validator.go, services/subtreevalidation/txmetaHandler.go). Backwards-compatible alongside v1; receiver auto-detects via magic byte 0xFF. [1 byte] magic = 0xFF [1 byte] version = 0x02 [2 bytes] reserved [4 bytes] entry count per entry: [8 bytes] xxhash(tx hash) [32 bytes] tx hash [1 byte] action (0=ADD, 1=DELETE) [4 bytes] content length [N bytes] content Producer groups items by partition using bucketIdx = xxhash(hash) % BucketsCount bucketsPerPartition = BucketsCount / NumPartitions partition = bucketIdx / bucketsPerPartition so each Kafka partition's records map to a disjoint contiguous range of receiver cache buckets. Concurrent per-partition receiver writers touch disjoint buckets and run lock-free relative to each other. Producer setting validator_txmeta_wireFormat (v1|v2, default v1) and validator_txmeta_numPartitions (default 32, must divide BucketsCount). 2. Receiver rewrite (txmetaHandler.go). Drops the 256-way shard fan-out + worker pool + caught-up latch. Parses v1 and v2 inline, dispatches all ADDs in one SetCacheMultiSequentialWithHashes call (or SetCacheMultiSequential on v1) per Kafka message, DELETEs inline. Parallelism comes from the per-partition Kafka consumer goroutines, not from internal fan-out. 3. Cache: SetMultiSequential family (stores/txmetacache/ improved_cache.go + txmetacache.go). Partition-aware twin of SetMulti — same per-bucket grouping but no errgroup fan-out. The With-Hashes variant takes caller-supplied xxhash values so the receiver passes the on-wire v2 hash straight through. 4. Cache: inner-shard collapse (improved_cache.go). bucketNative and bucketTrimmed maps were constructed with NewNativeSplitLockFreeMapUint64(cap, BucketsCount), meaning every bucket map had 8192 inner sub-shards. cleanLockedMap rebuilt the inner-sharded map on every chunk wrap (24% CPU, 64% of bench allocations, 80% of in-use memory on the scaling-2 prod pod). Outer ImprovedCache already shards via h%BucketsCount and b.mu serialises writes — inner shard count is now 1. deleteFromNativeSplitMapShard / deleteFromSwissSplitMapShard updated in lockstep to route via shard 0. Eviction unchanged: cache's gen+idx rule operates on map values regardless of inner shard count. bucketUnallocated and bucketPreallocated use plain Go maps; unaffected. 5. TxMetaCacheBucketType setting (settings/ subtreevalidation_settings.go, services/subtreevalidation/ Server.go). Bucket type was hard-coded to Unallocated; now selectable via subtreevalidation_txMetaCacheBucketType (unallocated|preallocated|trimmed|native, default unallocated). Unknown values fall back to Unallocated with a Warn log. 6. Pooled producer + receiver buffers. txmetaParseBuffers (receiver) pools keys/values/hashes/deletes slice headers plus single-arena keysBuf and contentBuf. txmetaPartitionsScratch (validator) pools the partitions outer slice + per-partition inner slices. 7. Kafka producer manual partitioning (util/kafka/ kafka_producer_async.go). Message.Partition field + KafkaProducerConfig.ManualPartitioning + WithManualPartitioning option. daemon/daemon_kafka.go passes it for the txmeta producer when wireFormat=v2. 8. In-memory broker test surface. InMemoryBroker.HasConsumer, DropTopic, TruncateTopic for e2e tests + long bench runs. 9. Tests. v2 serialization byte layout, partition routing, v2-format receiver parse, mixed ADD/DELETE, bucket-type resolver, end-to-end pipeline via in-memory broker (txmeta_e2e_test.go). Bench numbers (M3 Max, 32 partitions, 256-byte payload, real production-config Native cache): BenchmarkTxmetaProducerReceiver_V2_Batch100 3.96M tx/s 33 allocs/op BenchmarkTxmetaProducerReceiver_V2_Batch1000 11.10M tx/s 70 allocs/op Bench is scheduler-bound after this change (~60% pthread_cond_wait, ~20% pthread_cond_signal). Cache writes, parse, and serialize allocations no longer appear in the top consumers.
The CI test job builds with the testtxmetacache tag where BucketsCount=8. Test_sendTxMetaBatchV2_PartitionRouting hard-coded numPartitions=32 and TestTxmetaE2E_V2_RoundTrip drove publishV2/serializeV2Partitioned with numPartitions=32, both of which then computed BucketsCount/numPartitions without the clamp the production sendTxMetaBatchV2 already has, panicking with integer divide by zero. - publishV2 and serializeV2Partitioned: mirror the production clamp (numPartitions>=1, bucketsPerPartition>=1, p capped at numPartitions-1) so the test math matches the routing code under both BucketsCount=8192 and BucketsCount=8 builds. - Test_sendTxMetaBatchV2_PartitionRouting: cap numPartitions to BucketsCount. With more partitions than buckets the production code clamps bucketsPerPartition to 1 and the test's "find two hashes in different partitions" loop would not terminate meaningfully. Also drop a stray blank line at the end of the Server struct so gci stops flagging Server.go.
CI runs with the testtxmetacache build tag which sets BucketsCount=8. The v2 partition-routing tests + bench helpers used numPartitions=32 hard-coded, so bucketsPerPartition = 8/32 = 0, then bucket/0 panics. - services/validator/Validator_test.go: Test_sendTxMetaBatchV2_PartitionRouting caps numPartitions to BucketsCount and skips when BucketsCount < 2. - services/subtreevalidation/txmeta_e2e_test.go: publishV2 and serializeV2Partitioned now apply the same cap that validator.sendTxMetaBatchV2 already had. Under testtxmetacache they collapse to a few partitions; under the production build they remain 32.
|
PR bsv-blockchain#912 (perf(txmeta): v2 wire format + partition-aware receiver) landed on main while this PR was open and added new uses of appendHeightToValue plus two new SetCacheMultiSequential* methods that call ImprovedCache's new SetMultiSequential / SetMultiSequentialWithHashes. My earlier "dead code removal" was wrong by the time it landed — height encoding is part of the active contract, not legacy. Changes: - Restore appendHeightToValue method, hitOldTx metric (declaration + atomic counter + Prometheus gauge), and noOfBlocksToKeepInTxMetaCache field + init. Functionally the height bytes are still ignored on read in the byte backend, but they're now load-bearing for the v2 receiver in bsv-blockchain#912 and the metric is still wired up. - Re-introduce the height suffix on SetCacheFromBytes and SetCacheMulti. - Add SetMultiSequential and SetMultiSequentialWithHashes to the cacheBackend interface; improvedCacheBackend forwards to the underlying *ImprovedCache; PointerCache shares its implementation with SetMultiFromBytes (the per-bucket goroutine fan-out doesn't exist there, so the partition-aware variant is identical for free). - Fix the empty-value SetCacheMulti subtest assertion: stored entry is again 4 bytes (the height suffix), not 0.
PR bsv-blockchain#912 (perf(txmeta): v2 wire format + partition-aware receiver) landed on main while this PR was open and added new uses of appendHeightToValue plus two new SetCacheMultiSequential* methods that call ImprovedCache's new SetMultiSequential / SetMultiSequentialWithHashes. My earlier "dead code removal" was wrong by the time it landed — height encoding is part of the active contract, not legacy. Changes: - Restore appendHeightToValue method, hitOldTx metric (declaration + atomic counter + Prometheus gauge), and noOfBlocksToKeepInTxMetaCache field + init. Functionally the height bytes are still ignored on read in the byte backend, but they're now load-bearing for the v2 receiver in bsv-blockchain#912 and the metric is still wired up. - Re-introduce the height suffix on SetCacheFromBytes and SetCacheMulti. - Add SetMultiSequential and SetMultiSequentialWithHashes to the cacheBackend interface; improvedCacheBackend forwards to the underlying *ImprovedCache; PointerCache shares its implementation with SetMultiFromBytes (the per-bucket goroutine fan-out doesn't exist there, so the partition-aware variant is identical for free). - Fix the empty-value SetCacheMulti subtest assertion: stored entry is again 4 bytes (the height suffix), not 0.
The v2 magic-byte detection was unsafe: v1 messages with entry counts 255, 511, 767, ... begin with byte 0xFF (the LE-uint32 low byte), so the naive `data[0] == 0xFF` check misclassified them. Count 767 in particular aliases the entire 4-byte v2 header `0xFF 0x02 0x00 0x00` exactly. Both legacy/netsync (this PR) and subtreevalidation (PR bsv-blockchain#912) had the bug, silently dropping the netsync announce or garbling the txmeta cache write. Detection now requires the full 4-byte header signature plus a plausibility check on the resulting v2 entry count (entries * minEntrySize must fit in the remaining buffer); any failure falls through to v1 parsing instead of dropping the message. Move the wire-format constants (action bytes, magic, version, header length, min entry size) into a new stores/txmetacache/wire.go so the producer (validator) and all consumers (subtreevalidation, legacy/netsync) reference one source of truth. Drops three local copies and prevents future drift. Regression tests cover the count=255 and count=767 collisions in both the netsync consumer and the subtreevalidation handler, asserting that the v1 dispatch path runs and the v2 path does not. Addresses Copilot review feedback on PR bsv-blockchain#954.




Summary
End-to-end optimisation of the txmeta path between the validator (producer) and the subtree-validation receiver. Targeting 10M tx/s/pod for the txmeta consume path.
Bench delivers 11.1M tx/s through the full producer→receiver pipeline at batch=1000 with 32 partitions on M3 Max against a real production-config
Nativecache, up from 749K-1.14M on the shard-fanout baseline.What's in this PR
Wire format v2 + partition-aware producer
xxhashper entry so the receiver skips its own xxhash on the hot path.partition = (xxhash(hash) % BucketsCount) / (BucketsCount / NumPartitions). Each Kafka partition now maps to a disjoint contiguous range of receiver cache buckets — receivers writing concurrently across partitions take no cross-partition cache-bucket locks.validator_txmeta_wireFormat(v1|v2, default v1) andvalidator_txmeta_numPartitions(default 32, must divide BucketsCount).Receiver rewrite
txmetaHandler.go.SetCacheMultiSequentialWithHashescall per Kafka message; DELETEs inline.util/kafka/kafka_consumer.go), not from internal fan-out.Cache: new sequential paths + inner-shard collapse
SetMultiSequential/SetMultiSequentialWithHashesonImprovedCache— partition-aware twins ofSetMulti. Same per-bucket grouping but noerrgroupfan-out (the caller already has parallelism via the Kafka per-partition consumer goroutines).bucketNativeandbucketTrimmedper-bucket maps were previously constructed withNewNativeSplitLockFreeMapUint64(cap, BucketsCount)→ 8192 outer × 8192 inner sub-maps ≈ 67M sub-maps per cache instance.cleanLockedMaprebuilt that on every chunk wrap. Inner shard count is now1.deleteFromNativeSplitMapShard/deleteFromSwissSplitMapShardupdated in lockstep.gen+idx-based rule operates on map values regardless of inner shard count.bucketUnallocatedandbucketPreallocateduse plain Go maps without inner sharding — unaffected. Production today runsbucketUnallocated, so this win lands only when the bucket type is switched (see next).TxMetaCacheBucketType setting
subtreevalidation_txMetaCacheBucketTypesetting (unallocated|preallocated|trimmed|native, default unallocated = current prod behaviour).unallocatedwith a Warn log so a typo never silently changes the deployed allocator.subtreevalidation_txMetaCacheTrimRatioalso exposed as a typed setting (currently only affects Preallocated).Pooled buffers
txmetaParseBuffers(receiver) poolskeys/values/hashes/deletesslice headers plus single-arenakeysBufandcontentBuf— per-entry hash/content allocations collapse to one append into a pre-sized arena.txmetaPartitionsScratch(validator) pools the partitions outer slice + per-partition inner slices forsendTxMetaBatchV2.Kafka producer manual partitioning
Message.Partitionfield +KafkaProducerConfig.ManualPartitioning+WithManualPartitioningoption. When enabled, registerskgo.ManualPartitionerandflushBufferedhonoursrecord.Partition. Off by default; existingStickyKeyPartitionerbehaviour preserved.daemon/daemon_kafka.gowires it for the txmeta producer whenwireFormat=v2.In-memory broker test surface
InMemoryBroker.HasConsumerandDropTopicso e2e tests can wait for consumer-group registration and release retained-messages state at teardown.TruncateTopicalso added for long bench runs.Tests
txmeta_e2e_test.go).Bench numbers
M3 Max, 32 partitions, 256-byte payload, real production-config Native cache (
sync.Once-shared across bench invocations):BenchmarkTxmetaProducerReceiver_V2_Batch100BenchmarkTxmetaProducerReceiver_V2_Batch1000For context, baseline at start of optimisation work was 749K (batch=100) / 1.14M (batch=1000) tx/s with ~2400 allocs/op.
The bench is now scheduler-bound (~60%
pthread_cond_wait, ~20%pthread_cond_signal). Cache writes, parse, and serialize allocations no longer appear in the top consumers.Production rollout
Default behaviour is unchanged. Existing deploys keep
bucketUnallocated,wireFormat=v1, no manual partitioning. New settings are opt-in.To unlock the wins in prod (recommended sequence, one knob per canary):
validator_txmeta_wireFormat=v2andvalidator_txmeta_numPartitionsto match the actual txmeta topic partition count (must divideBucketsCount=8192). Confirm receiver lag stays flat.subtreevalidation_txMetaCacheBucketType=nativeon one canary subtree-validation pod with productionTxMetaCacheMaxMB. Watch RSS,cleanLockedMapallocation rate (heap pprof),subtreevalidation_set_tx_meta_cache_kafkahistogram. Roll out once a chunk-rotation cycle is complete with healthy memory.Settings honoured as today:
TxMetaCacheMaxMB,txMetaCacheTrimRatio(still only affectsPreallocated),TxMetaCacheEnabled.Test plan
go build ./...cleango vetclean across touched packagestesttxmetacachebuild tagsubtreevalidation_txMetaCacheBucketType=nativeat production cache sizevalidator_txmeta_wireFormat=v2subtreevalidation_set_tx_meta_cache_kafkahistogram + RSS in the canaryKnown follow-ups (separate PR)
bucketUnallocated.cleanLockedMapstill allocates a freshmap[uint64]uint64on every chunk wrap (held 17 GB in-use on scaling-2). Worth async.Pool+clear(m)pass for the prod default bucket type.kafka_consumer.go'sEachPartitioncould be replaced with persistent partition workers fed by channels. Bench is scheduler-bound; production isn't (fetch rate is ~30/sec vs bench ~13k iter/sec), so this is a "bank headroom" change.