fix(subtreevalidation): two-mode txmeta enqueue (block during backfill, drop+warn live)#891
Conversation
…l, drop+warn live) Replaces the unconditional drop-on-full behavior introduced in bsv-blockchain#858 with a mode latched off the Kafka partition high water mark: - Startup (catching up to live): blocking enqueue with ctx escape so no txmeta cache updates are dropped while the cache is cold. The Kafka consumer is naturally paced by the slowest shard worker. - Caught up (latched once msg.Offset+1 >= msg.HighWaterMark): non-blocking enqueue; a full shard queue is logged at Warn instead of Error and the remainder of the batch is abandoned (cache repopulates from Kafka on restart, by design). The latch is one-way: once flipped it stays set, so an extended re-catch-up later operates in live (drop) mode. Plumbs HighWaterMark through util/kafka.KafkaMessage by iterating fetches.EachPartition in the franz-go consumer and reading claim.HighWaterMarkOffset() in the in-memory consumer. Field is additive; other consumers ignore it.
|
🤖 Claude Code Review Status: Complete SummaryReviewed PR #891 implementing two-mode txmeta enqueue (blocking during backfill, drop-on-full when live). Assessment: No issues found. The implementation successfully addresses the production error log spam while preventing cache cold-starts during backfill. Copilot Feedback ReviewReviewed and addressed 4 existing Copilot comments:
Added clarifying replies to each comment thread. Code QualityStrengths:
Implementation Details Verified:
|
There was a problem hiding this comment.
Pull request overview
This PR updates the subtreevalidation txmeta ingestion path to avoid dropping txmeta cache updates during startup backfill (by blocking enqueue) while preserving best-effort behavior once caught up to the Kafka partition tail (by dropping on full queues with a Warn log). It also extends the Kafka consumer message wrapper to include the partition high water mark so consumers can detect when they’ve reached the live tail.
Changes:
- Add
HighWaterMarktokafka.KafkaMessageand populate it from franz-go fetch partition metadata (and in-memory consumer claim). - Introduce a
txmetaCaughtUplatch in subtreevalidation and switch enqueue behavior between blocking (startup) and drop+warn (caught-up). - Replace/expand txmeta handler tests to cover both modes and latch behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| util/kafka/kafka_consumer.go | Adds HighWaterMark to consumed messages and switches polling from per-record to per-partition to capture HWM. |
| services/subtreevalidation/Server.go | Adds txmetaCaughtUp latch state to the server. |
| services/subtreevalidation/txmetaHandler.go | Implements two-mode enqueue and a latch flip based on Offset+1 >= HighWaterMark. |
| services/subtreevalidation/txmetaHandler_test.go | Updates test suite to validate startup blocking, caught-up drop behavior, and latch semantics. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- util/kafka/kafka_consumer.go: use `continue` instead of `return` after a failing consumerFn in the EachPartition loop. The refactor from EachRecord changed semantics — a single bad record was killing all subsequent records in the same partition. - services/subtreevalidation/txmetaHandler.go: clarify the "queue full" comment to name the actual log site (enqueueTxmetaWorkItem). Document the deliberate one-way / any-partition / fail-closed-on-zero-HWM latch semantics so they are explicit in the source.
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-05-19 09:46 UTC |
|
Thanks for the review. Addressed in 8710171:
|
|
…heMulti
Replaces per-entry shard-worker dispatch with per-shard-batch dispatch.
For each Kafka message, entries are bucketed into per-shard slices in a
single parse pass and sent as one *txmetaShardBatch per non-empty shard.
The worker then calls SetTxMetaCacheMulti once per shard-batch instead of
SetTxMetaCacheFromBytes once per entry.
Two wins:
- Channel hops drop from N (entries) to up-to-256 (shards) per Kafka
message — eliminates per-entry channel send/recv on the hot path.
- The cache acquires each touched bucket lock once per shard-batch
instead of once per entry. Larger batches hit the existing
smallSetMultiBatchThreshold fast-path in ImprovedCache.SetMulti and
stay on the cheap per-key Set loop without spawning bucket goroutines.
Preserves the two-mode (startup-block / caught-up-drop) enqueue contract
from #891; the "abandon remainder of batch" semantic is now per-shard-
batch rather than per-entry. Hash-byte sharding still provides per-key
ordering. ADD content is still copied out of msg.Value at parse time.
Per-batch metrics observation replaces per-item observation — dashboards
that interpret one Observe() as one record will need adjusting.
Microbench (Apple M3 Max, counting cache, 30s window per batch size):
Batch tx/s ns/tx allocs/op
1 2.5M 395 5
100 1.5M 649 500
1000 4.0M 249 2536
10000 11.8M 84 12565
Includes a new txmetaHandler_bench_test.go to keep the throughput
measurement reproducible; uses a counting txMetaCacheOps to isolate
dispatch cost from cache write cost.



Summary
Replaces the unconditional drop-on-full behavior in the txmeta shard worker pool (introduced in #858) with a mode latched off the Kafka partition high water mark:
msg.Offset+1 >= msg.HighWaterMark): non-blocking enqueue; a full shard queue is now logged at Warn instead of Error, and the remainder of the batch is abandoned (cache repopulates from Kafka on restart — best-effort by design).The latch is one-way: once set it stays set, so a long re-catch-up later still operates in live (drop) mode.
Why
In production we see this Error log spamming during scale tests:
This is the documented best-effort drop firing, so it shouldn't be logged at Error level. And during initial backfill, dropping txmeta updates leaves the cache cold for those entries — better to apply backpressure to Kafka until we're caught up.
Changes
util/kafka/kafka_consumer.go: addedHighWaterMark int64toKafkaMessage; switched the franz-go fetch loop fromEachRecordtoEachPartitionto readp.HighWatermark; populated HWM in the in-memory consumer viaclaim.HighWaterMarkOffset(). Field is additive — other consumers ignore it.services/subtreevalidation/Server.go: addedtxmetaCaughtUp atomic.Boollatch.services/subtreevalidation/txmetaHandler.go: two-modeenqueueTxmetaWorkItem;maybeMarkTxmetaCaughtUpflips the latch on the partition tail.services/subtreevalidation/txmetaHandler_test.go: replaced the now-invalidReturnsErrorWhenQueueFulltest with six tests covering caught-up drop, startup blocking, ctx-cancel unblock, latch flip on HWM, no flip when HWM unset, and one-way latch behavior.Latch design (deliberate trade-offs)
Documented in code at
maybeMarkTxmetaCaughtUp:HighWaterMark<=0. An unset HWM (in-memory consumer, hand-built messages) keeps the latch in startup (blocking) mode. Smoke tests are low-throughput, so the shard queues should never fill and the blocking mode is effectively a no-op there.Test plan
go build ./...cleango vet ./services/subtreevalidation/... ./util/kafka/...cleanstaticcheck ./services/subtreevalidation/... ./util/kafka/...cleangolangci-lint run ./services/subtreevalidation/... ./util/kafka/...clean (pre-existing prealloc warnings in unrelated test files)go test -race -count=1 ./services/subtreevalidation/... ./util/kafka/...all pass