Skip to content

fix(subtreevalidation): two-mode txmeta enqueue (block during backfill, drop+warn live)#891

Merged
icellan merged 2 commits into
bsv-blockchain:mainfrom
icellan:fix/txmeta-startup-mode-backpressure
May 19, 2026
Merged

fix(subtreevalidation): two-mode txmeta enqueue (block during backfill, drop+warn live)#891
icellan merged 2 commits into
bsv-blockchain:mainfrom
icellan:fix/txmeta-startup-mode-backpressure

Conversation

@icellan

@icellan icellan commented May 19, 2026

Copy link
Copy Markdown
Contributor

Summary

Replaces the unconditional drop-on-full behavior in the txmeta shard worker pool (introduced in #858) with a mode latched off the Kafka partition high water mark:

  • Startup (catching up to the live tail): blocking enqueue with ctx escape so no txmeta cache updates are dropped while the cache is cold. The Kafka consumer is naturally paced by the slowest shard worker.
  • Caught up (latched the first time msg.Offset+1 >= msg.HighWaterMark): non-blocking enqueue; a full shard queue is now logged at Warn instead of Error, and the remainder of the batch is abandoned (cache repopulates from Kafka on restart — best-effort by design).

The latch is one-way: once set it stays set, so a long re-catch-up later still operates in live (drop) mode.

Why

In production we see this Error log spamming during scale tests:

ERROR | [kafka_consumer] error processing kafka message on topic txmeta-... skipping: PROCESSING (4): [txmetaHandler] txmeta worker queue full for shard 155

This is the documented best-effort drop firing, so it shouldn't be logged at Error level. And during initial backfill, dropping txmeta updates leaves the cache cold for those entries — better to apply backpressure to Kafka until we're caught up.

Changes

  • util/kafka/kafka_consumer.go: added HighWaterMark int64 to KafkaMessage; switched the franz-go fetch loop from EachRecord to EachPartition to read p.HighWatermark; populated HWM in the in-memory consumer via claim.HighWaterMarkOffset(). Field is additive — other consumers ignore it.
  • services/subtreevalidation/Server.go: added txmetaCaughtUp atomic.Bool latch.
  • services/subtreevalidation/txmetaHandler.go: two-mode enqueueTxmetaWorkItem; maybeMarkTxmetaCaughtUp flips the latch on the partition tail.
  • services/subtreevalidation/txmetaHandler_test.go: replaced the now-invalid ReturnsErrorWhenQueueFull test with six tests covering caught-up drop, startup blocking, ctx-cancel unblock, latch flip on HWM, no flip when HWM unset, and one-way latch behavior.

Latch design (deliberate trade-offs)

Documented in code at maybeMarkTxmetaCaughtUp:

  • One-way. A node that catches up, runs for hours, then falls badly behind (extended pause) operates in drop mode through the re-catch-up. If you'd rather it re-arm on lag, that's a small follow-up.
  • Any-partition. For multi-partition txmeta topics, the latch flips as soon as ANY assigned partition reaches its tail. txmeta is sharded by tx hash and partitions are evenly loaded under normal traffic, so seeing one tail is a strong signal we're broadly caught up. A stricter per-partition gating would extend cold-cache blocking unnecessarily if one partition is temporarily empty or slow.
  • Fail-closed on HighWaterMark<=0. An unset HWM (in-memory consumer, hand-built messages) keeps the latch in startup (blocking) mode. Smoke tests are low-throughput, so the shard queues should never fill and the blocking mode is effectively a no-op there.
  • Abort-on-first-full-shard preserved. In drop mode the rest of the batch is still abandoned on first full shard. This matches the prior semantic; widening it to "skip full shards but enqueue to others" is a separate change.

Test plan

  • go build ./... clean
  • go vet ./services/subtreevalidation/... ./util/kafka/... clean
  • staticcheck ./services/subtreevalidation/... ./util/kafka/... clean
  • golangci-lint run ./services/subtreevalidation/... ./util/kafka/... clean (pre-existing prealloc warnings in unrelated test files)
  • go test -race -count=1 ./services/subtreevalidation/... ./util/kafka/... all pass
  • Verify in scale-test environment that the production Error log is gone and startup no longer drops txmeta updates while catching up.

…l, drop+warn live)

Replaces the unconditional drop-on-full behavior introduced in bsv-blockchain#858 with a
mode latched off the Kafka partition high water mark:

- Startup (catching up to live): blocking enqueue with ctx escape so no
  txmeta cache updates are dropped while the cache is cold. The Kafka
  consumer is naturally paced by the slowest shard worker.
- Caught up (latched once msg.Offset+1 >= msg.HighWaterMark): non-blocking
  enqueue; a full shard queue is logged at Warn instead of Error and the
  remainder of the batch is abandoned (cache repopulates from Kafka on
  restart, by design).

The latch is one-way: once flipped it stays set, so an extended re-catch-up
later operates in live (drop) mode.

Plumbs HighWaterMark through util/kafka.KafkaMessage by iterating
fetches.EachPartition in the franz-go consumer and reading
claim.HighWaterMarkOffset() in the in-memory consumer. Field is additive;
other consumers ignore it.
@github-actions

github-actions Bot commented May 19, 2026

Copy link
Copy Markdown
Contributor

🤖 Claude Code Review

Status: Complete


Summary

Reviewed PR #891 implementing two-mode txmeta enqueue (blocking during backfill, drop-on-full when live).

Assessment: No issues found. The implementation successfully addresses the production error log spam while preventing cache cold-starts during backfill.


Copilot Feedback Review

Reviewed and addressed 4 existing Copilot comments:

  1. kafka_consumer.go - Comment about return vs continue: ✅ Already fixed (code uses continue)
  2. txmetaHandler.go:122 - Comment about log location: ✅ Code is correct (warning emitted in enqueueTxmetaWorkItem)
  3. txmetaHandler.go:166 - Any-partition latch concern: ✅ Intentional design choice, explicitly documented
  4. txmetaHandler.go:162 - HWM<=0 behavior: ✅ Code matches docs (fail-closed, not fail-open)

Added clarifying replies to each comment thread.


Code Quality

Strengths:

  • Comprehensive test coverage: 6 new tests covering all modes and edge cases
  • Excellent documentation: Design trade-offs explicitly documented in code comments
  • Thread safety: Proper atomic operations for latch, context cancellation support
  • Backward compatibility: Low-throughput scenarios (smoke tests) unaffected

Implementation Details Verified:

  • Latch is one-way (never reverts from caught-up to startup mode)
  • Blocking mode prevents drops during cold cache backfill
  • Drop mode uses Warn level logging (not Error) as intended
  • Context cancellation unblocks startup-mode sends for clean shutdown

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the subtreevalidation txmeta ingestion path to avoid dropping txmeta cache updates during startup backfill (by blocking enqueue) while preserving best-effort behavior once caught up to the Kafka partition tail (by dropping on full queues with a Warn log). It also extends the Kafka consumer message wrapper to include the partition high water mark so consumers can detect when they’ve reached the live tail.

Changes:

  • Add HighWaterMark to kafka.KafkaMessage and populate it from franz-go fetch partition metadata (and in-memory consumer claim).
  • Introduce a txmetaCaughtUp latch in subtreevalidation and switch enqueue behavior between blocking (startup) and drop+warn (caught-up).
  • Replace/expand txmeta handler tests to cover both modes and latch behavior.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
util/kafka/kafka_consumer.go Adds HighWaterMark to consumed messages and switches polling from per-record to per-partition to capture HWM.
services/subtreevalidation/Server.go Adds txmetaCaughtUp latch state to the server.
services/subtreevalidation/txmetaHandler.go Implements two-mode enqueue and a latch flip based on Offset+1 >= HighWaterMark.
services/subtreevalidation/txmetaHandler_test.go Updates test suite to validate startup blocking, caught-up drop behavior, and latch semantics.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread util/kafka/kafka_consumer.go Outdated
Comment thread services/subtreevalidation/txmetaHandler.go Outdated
Comment thread services/subtreevalidation/txmetaHandler.go
Comment thread services/subtreevalidation/txmetaHandler.go
- util/kafka/kafka_consumer.go: use `continue` instead of `return` after a
  failing consumerFn in the EachPartition loop. The refactor from
  EachRecord changed semantics — a single bad record was killing all
  subsequent records in the same partition.

- services/subtreevalidation/txmetaHandler.go: clarify the "queue full"
  comment to name the actual log site (enqueueTxmetaWorkItem). Document
  the deliberate one-way / any-partition / fail-closed-on-zero-HWM latch
  semantics so they are explicit in the source.
@github-actions

github-actions Bot commented May 19, 2026

Copy link
Copy Markdown
Contributor

Benchmark Comparison Report

Baseline: main (unknown)

Current: PR-891 (01dac56)

Summary

  • Regressions: 0
  • Improvements: 0
  • Unchanged: 142
  • Significance level: p < 0.05
All benchmark results (sec/op)
Benchmark Baseline Current Change p-value
_NewBlockFromBytes-4 1.898µ 1.661µ ~ 0.700
SplitSyncedParentMap_SetIfNotExists/256_buckets-4 61.66n 61.58n ~ 0.400
SplitSyncedParentMap_SetIfNotExists/16_buckets-4 61.68n 61.72n ~ 0.400
SplitSyncedParentMap_SetIfNotExists/1_bucket-4 61.61n 61.58n ~ 0.700
SplitSyncedParentMap_ConcurrentSetIfNotExists/256_buckets... 29.15n 29.25n ~ 0.400
SplitSyncedParentMap_ConcurrentSetIfNotExists/16_buckets_... 48.84n 49.53n ~ 0.600
SplitSyncedParentMap_ConcurrentSetIfNotExists/1_bucket_pa... 107.0n 108.3n ~ 0.700
MiningCandidate_Stringify_Short-4 266.0n 267.5n ~ 0.700
MiningCandidate_Stringify_Long-4 1.849µ 1.849µ ~ 1.000
MiningSolution_Stringify-4 944.3n 943.4n ~ 0.300
BlockInfo_MarshalJSON-4 1.737µ 1.747µ ~ 0.200
NewFromBytes-4 123.3n 122.7n ~ 0.300
Mine_EasyDifficulty-4 52.02µ 52.99µ ~ 0.100
Mine_WithAddress-4 5.686µ 5.452µ ~ 0.400
BlockAssembler_AddTx-4 0.02710n 0.02966n ~ 0.400
AddNode-4 11.31 10.57 ~ 0.400
AddNodeWithMap-4 11.65 11.63 ~ 1.000
DirectSubtreeAdd/4_per_subtree-4 58.39n 61.36n ~ 0.400
DirectSubtreeAdd/64_per_subtree-4 28.72n 29.03n ~ 0.700
DirectSubtreeAdd/256_per_subtree-4 27.52n 27.64n ~ 0.400
DirectSubtreeAdd/1024_per_subtree-4 26.32n 26.50n ~ 0.100
DirectSubtreeAdd/2048_per_subtree-4 26.00n 26.07n ~ 0.200
SubtreeProcessorAdd/4_per_subtree-4 291.7n 295.9n ~ 0.400
SubtreeProcessorAdd/64_per_subtree-4 280.9n 298.6n ~ 0.100
SubtreeProcessorAdd/256_per_subtree-4 282.6n 294.3n ~ 0.700
SubtreeProcessorAdd/1024_per_subtree-4 271.1n 281.7n ~ 0.100
SubtreeProcessorAdd/2048_per_subtree-4 273.3n 275.8n ~ 0.700
SubtreeProcessorRotate/4_per_subtree-4 280.7n 290.8n ~ 0.700
SubtreeProcessorRotate/64_per_subtree-4 281.2n 281.0n ~ 1.000
SubtreeProcessorRotate/256_per_subtree-4 277.2n 284.6n ~ 0.100
SubtreeProcessorRotate/1024_per_subtree-4 289.2n 291.6n ~ 0.300
SubtreeNodeAddOnly/4_per_subtree-4 54.98n 54.84n ~ 0.700
SubtreeNodeAddOnly/64_per_subtree-4 34.50n 34.65n ~ 0.700
SubtreeNodeAddOnly/256_per_subtree-4 33.64n 33.52n ~ 0.700
SubtreeNodeAddOnly/1024_per_subtree-4 32.88n 32.85n ~ 0.800
SubtreeCreationOnly/4_per_subtree-4 115.6n 116.6n ~ 0.700
SubtreeCreationOnly/64_per_subtree-4 411.8n 417.7n ~ 0.200
SubtreeCreationOnly/256_per_subtree-4 1.399µ 1.413µ ~ 0.300
SubtreeCreationOnly/1024_per_subtree-4 4.493µ 4.516µ ~ 1.000
SubtreeCreationOnly/2048_per_subtree-4 8.364µ 8.499µ ~ 0.100
SubtreeProcessorOverheadBreakdown/64_per_subtree-4 288.5n 282.0n ~ 1.000
SubtreeProcessorOverheadBreakdown/1024_per_subtree-4 293.8n 279.8n ~ 0.100
ParallelGetAndSetIfNotExists/1k_nodes-4 2.227m 2.259m ~ 0.100
ParallelGetAndSetIfNotExists/10k_nodes-4 5.460m 5.658m ~ 0.200
ParallelGetAndSetIfNotExists/50k_nodes-4 7.607m 7.618m ~ 0.700
ParallelGetAndSetIfNotExists/100k_nodes-4 10.41m 10.46m ~ 1.000
SequentialGetAndSetIfNotExists/1k_nodes-4 1.975m 1.945m ~ 0.100
SequentialGetAndSetIfNotExists/10k_nodes-4 4.424m 4.444m ~ 0.400
SequentialGetAndSetIfNotExists/50k_nodes-4 12.27m 12.97m ~ 0.100
SequentialGetAndSetIfNotExists/100k_nodes-4 22.14m 23.40m ~ 0.200
ProcessOwnBlockSubtreeNodesParallel/1k_nodes-4 2.279m 2.296m ~ 0.400
ProcessOwnBlockSubtreeNodesParallel/10k_nodes-4 8.581m 8.360m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/100k_nodes-4 14.39m 13.48m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/1k_nodes-4 2.034m 1.976m ~ 0.400
ProcessOwnBlockSubtreeNodesSequential/10k_nodes-4 8.793m 7.806m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/100k_nodes-4 42.86m 40.96m ~ 0.200
DiskTxMap_SetIfNotExists-4 3.874µ 4.096µ ~ 0.700
DiskTxMap_SetIfNotExists_Parallel-4 3.669µ 3.922µ ~ 0.100
DiskTxMap_ExistenceOnly-4 346.2n 419.7n ~ 0.100
Queue-4 185.8n 189.8n ~ 0.100
AtomicPointer-4 3.694n 3.657n ~ 0.400
ReorgOptimizations/DedupFilterPipeline/Old/10K-4 808.0µ 799.8µ ~ 0.200
ReorgOptimizations/DedupFilterPipeline/New/10K-4 761.4µ 769.0µ ~ 0.200
ReorgOptimizations/AllMarkFalse/Old/10K-4 102.0µ 104.1µ ~ 0.100
ReorgOptimizations/AllMarkFalse/New/10K-4 63.92µ 64.33µ ~ 0.100
ReorgOptimizations/HashSlicePool/Old/10K-4 53.45µ 51.54µ ~ 1.000
ReorgOptimizations/HashSlicePool/New/10K-4 11.12µ 11.10µ ~ 1.000
ReorgOptimizations/NodeFlags/Old/10K-4 4.456µ 5.335µ ~ 0.100
ReorgOptimizations/NodeFlags/New/10K-4 1.505µ 2.333µ ~ 0.100
ReorgOptimizations/DedupFilterPipeline/Old/100K-4 9.235m 9.422m ~ 0.100
ReorgOptimizations/DedupFilterPipeline/New/100K-4 9.287m 10.122m ~ 0.100
ReorgOptimizations/AllMarkFalse/Old/100K-4 1.065m 1.076m ~ 0.200
ReorgOptimizations/AllMarkFalse/New/100K-4 707.3µ 714.5µ ~ 0.200
ReorgOptimizations/HashSlicePool/Old/100K-4 523.7µ 499.2µ ~ 0.700
ReorgOptimizations/HashSlicePool/New/100K-4 216.4µ 206.1µ ~ 0.700
ReorgOptimizations/NodeFlags/Old/100K-4 47.74µ 49.29µ ~ 0.100
ReorgOptimizations/NodeFlags/New/100K-4 16.74µ 17.19µ ~ 0.100
TxMapSetIfNotExists-4 46.45n 46.92n ~ 0.100
TxMapSetIfNotExistsDuplicate-4 38.59n 39.02n ~ 0.400
ChannelSendReceive-4 603.6n 592.7n ~ 0.200
CalcBlockWork-4 501.7n 502.5n ~ 0.700
CalculateWork-4 682.1n 685.2n ~ 0.200
BuildBlockLocatorString_Helpers/Size_10-4 1.336µ 1.347µ ~ 0.200
BuildBlockLocatorString_Helpers/Size_100-4 12.82µ 15.77µ ~ 0.200
BuildBlockLocatorString_Helpers/Size_1000-4 159.2µ 127.0µ ~ 0.100
CatchupWithHeaderCache-4 104.2m 104.4m ~ 0.400
_prepareTxsPerLevel-4 410.8m 413.7m ~ 0.700
_prepareTxsPerLevelOrdered-4 4.072m 3.672m ~ 0.100
_prepareTxsPerLevel_Comparison/Original-4 416.4m 414.0m ~ 1.000
_prepareTxsPerLevel_Comparison/Optimized-4 3.828m 3.837m ~ 0.700
_BufferPoolAllocation/16KB-4 4.848µ 3.903µ ~ 0.700
_BufferPoolAllocation/32KB-4 8.736µ 7.930µ ~ 0.200
_BufferPoolAllocation/64KB-4 16.27µ 17.73µ ~ 0.400
_BufferPoolAllocation/128KB-4 27.70µ 37.39µ ~ 0.400
_BufferPoolAllocation/512KB-4 103.51µ 95.63µ ~ 0.100
_BufferPoolConcurrent/32KB-4 18.54µ 18.78µ ~ 1.000
_BufferPoolConcurrent/64KB-4 29.09µ 29.35µ ~ 1.000
_BufferPoolConcurrent/512KB-4 143.5µ 145.6µ ~ 0.200
_SubtreeDeserializationWithBufferSizes/16KB-4 615.1µ 614.6µ ~ 0.700
_SubtreeDeserializationWithBufferSizes/32KB-4 611.2µ 600.5µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/64KB-4 614.7µ 589.2µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/128KB-4 613.4µ 581.4µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/512KB-4 614.8µ 587.1µ ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/16KB-4 36.06m 35.81m ~ 0.400
_SubtreeDataDeserializationWithBufferSizes/32KB-4 36.04m 35.90m ~ 0.200
_SubtreeDataDeserializationWithBufferSizes/64KB-4 35.96m 35.86m ~ 0.400
_SubtreeDataDeserializationWithBufferSizes/128KB-4 35.58m 36.07m ~ 0.400
_SubtreeDataDeserializationWithBufferSizes/512KB-4 35.84m 35.93m ~ 1.000
_PooledVsNonPooled/Pooled-4 833.7n 834.2n ~ 0.400
_PooledVsNonPooled/NonPooled-4 8.283µ 7.576µ ~ 0.700
_MemoryFootprint/Current_512KB_32concurrent-4 6.641µ 6.718µ ~ 0.400
_MemoryFootprint/Proposed_32KB_32concurrent-4 9.215µ 9.341µ ~ 0.400
_MemoryFootprint/Alternative_64KB_32concurrent-4 8.709µ 9.124µ ~ 0.200
SubtreeSizes/10k_tx_4_per_subtree-4 1.013m 1.057m ~ 1.000
SubtreeSizes/10k_tx_16_per_subtree-4 238.4µ 245.0µ ~ 0.100
SubtreeSizes/10k_tx_64_per_subtree-4 57.53µ 58.01µ ~ 0.300
SubtreeSizes/10k_tx_256_per_subtree-4 14.36µ 14.46µ ~ 0.100
SubtreeSizes/10k_tx_512_per_subtree-4 7.160µ 7.122µ ~ 1.000
SubtreeSizes/10k_tx_1024_per_subtree-4 3.531µ 3.544µ ~ 1.000
SubtreeSizes/10k_tx_2k_per_subtree-4 1.765µ 1.766µ ~ 1.000
BlockSizeScaling/10k_tx_64_per_subtree-4 55.98µ 56.95µ ~ 0.100
BlockSizeScaling/10k_tx_256_per_subtree-4 14.16µ 14.07µ ~ 0.700
BlockSizeScaling/10k_tx_1024_per_subtree-4 3.482µ 3.513µ ~ 0.100
BlockSizeScaling/50k_tx_64_per_subtree-4 295.7µ 295.3µ ~ 1.000
BlockSizeScaling/50k_tx_256_per_subtree-4 70.08µ 69.57µ ~ 0.400
BlockSizeScaling/50k_tx_1024_per_subtree-4 17.55µ 17.21µ ~ 0.100
SubtreeAllocations/small_subtrees_exists_check-4 117.5µ 118.5µ ~ 1.000
SubtreeAllocations/small_subtrees_data_fetch-4 126.8µ 125.0µ ~ 0.100
SubtreeAllocations/small_subtrees_full_validation-4 245.5µ 243.6µ ~ 1.000
SubtreeAllocations/medium_subtrees_exists_check-4 7.184µ 7.130µ ~ 0.700
SubtreeAllocations/medium_subtrees_data_fetch-4 7.652µ 7.503µ ~ 0.400
SubtreeAllocations/medium_subtrees_full_validation-4 14.03µ 14.12µ ~ 1.000
SubtreeAllocations/large_subtrees_exists_check-4 1.699µ 1.708µ ~ 0.700
SubtreeAllocations/large_subtrees_data_fetch-4 1.799µ 1.793µ ~ 1.000
SubtreeAllocations/large_subtrees_full_validation-4 3.548µ 3.512µ ~ 0.200
StoreBlock_Sequential/BelowCSVHeight-4 330.1µ 327.8µ ~ 0.100
StoreBlock_Sequential/AboveCSVHeight-4 328.6µ 328.5µ ~ 0.700
GetUtxoHashes-4 270.4n 279.0n ~ 0.200
GetUtxoHashes_ManyOutputs-4 47.52µ 46.28µ ~ 0.700
_NewMetaDataFromBytes-4 231.1n 233.0n ~ 0.300
_Bytes-4 613.2n 609.7n ~ 0.200
_MetaBytes-4 570.7n 562.3n ~ 0.100

Threshold: >10% with p < 0.05 | Generated: 2026-05-19 09:46 UTC

@icellan

icellan commented May 19, 2026

Copy link
Copy Markdown
Contributor Author

Thanks for the review. Addressed in 8710171:

  1. kafka_consumer.go return vs continue — Good catch, this was an unintended behavior change from the EachRecordEachPartition refactor. Fixed: a failing record now continues to the next record in the partition, matching pre-refactor semantics.

  2. Comment about who logs the warning — Reworded to name enqueueTxmetaWorkItem as the log site.

  3. Per-partition vs. any-partition latch — Intentional and aligned with what the reviewer requested. Documented as a deliberate trade-off in the maybeMarkTxmetaCaughtUp doc comment, with the reasoning (txmeta is sharded by tx hash, partitions are evenly loaded, one tail is a strong "broadly caught up" signal). If the assumption ever fails in practice we can switch to per-partition.

  4. PR description vs. fail-closed code — The PR description was wrong; the code is fail-closed (an unset HighWaterMark<=0 stays in startup/blocking mode). PR description updated. The TestServer_txmetaHandler_LatchIgnoredWhenHighWaterMarkUnset test pins this behavior explicitly. Smoke tests using the in-memory consumer are low-throughput, so the blocking mode is effectively a no-op there.

@sonarqubecloud

Copy link
Copy Markdown

@icellan icellan merged commit e4e45f6 into bsv-blockchain:main May 19, 2026
25 checks passed
@icellan icellan self-assigned this May 19, 2026
icellan added a commit that referenced this pull request May 19, 2026
…heMulti

Replaces per-entry shard-worker dispatch with per-shard-batch dispatch.
For each Kafka message, entries are bucketed into per-shard slices in a
single parse pass and sent as one *txmetaShardBatch per non-empty shard.
The worker then calls SetTxMetaCacheMulti once per shard-batch instead of
SetTxMetaCacheFromBytes once per entry.

Two wins:

  - Channel hops drop from N (entries) to up-to-256 (shards) per Kafka
    message — eliminates per-entry channel send/recv on the hot path.

  - The cache acquires each touched bucket lock once per shard-batch
    instead of once per entry. Larger batches hit the existing
    smallSetMultiBatchThreshold fast-path in ImprovedCache.SetMulti and
    stay on the cheap per-key Set loop without spawning bucket goroutines.

Preserves the two-mode (startup-block / caught-up-drop) enqueue contract
from #891; the "abandon remainder of batch" semantic is now per-shard-
batch rather than per-entry. Hash-byte sharding still provides per-key
ordering. ADD content is still copied out of msg.Value at parse time.

Per-batch metrics observation replaces per-item observation — dashboards
that interpret one Observe() as one record will need adjusting.

Microbench (Apple M3 Max, counting cache, 30s window per batch size):

  Batch       tx/s          ns/tx     allocs/op
  1           2.5M          395       5
  100         1.5M          649       500
  1000        4.0M          249       2536
  10000       11.8M         84        12565

Includes a new txmetaHandler_bench_test.go to keep the throughput
measurement reproducible; uses a counting txMetaCacheOps to isolate
dispatch cost from cache write cost.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants