fix(subtreevalidation): replace unbounded goroutine per Kafka message with bounded shard worker pool#858
Merged
gokutheengineer merged 3 commits intoMay 14, 2026
Conversation
Contributor
|
🤖 Claude Code Review Status: Complete Current Review: All previously reported issues have been resolved:
No new issues found. The implementation looks solid:
History:
|
Contributor
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-05-13 13:22 UTC |
7448b30 to
4f376db
Compare
|
liam
approved these changes
May 13, 2026
freemans13
approved these changes
May 13, 2026
icellan
added a commit
that referenced
this pull request
May 18, 2026
Conflicts resolved: - go.mod / go.sum: kept BSV aerospike-client fork + msgpack + go-tx-map pseudo-version from PR; took main's newer gobdk, go-chaincfg, go-wire, pgregory.net/rapid; dropped the upstream aerospike replace directive in favour of the BSV fork. - services/asset/repository/GetLegacyBlock.go: pass streamCtx to writeChunkToWriter (PR side wired the parameter). - services/blockassembly/subtreeprocessor/SubtreeProcessor.go: merged txMapPool/double-buffer fields from PR with main's clock field. - services/subtreevalidation/txmetaHandler.go + _test.go: kept main's bounded shard-worker pool (#858) rather than the PR's inline walker; the worker queue fields are referenced from Server.go which was auto-merged from main. - settings/asset_settings.go + settings/settings.go: kept PR's new ConcurrencySubtreeDataCreate field/binding. - stores/blob/file/file.go + _test.go: took main's allowOverwrite/ fsyncMode plumbing (renameTempFile/writeFileAtomically signatures gained allowOverwrite; syncAndCloseTempFile gated by fsyncMode). - stores/blockchain/sql/GenesisHash_test.go: took main's explicit-params insertGenesisTransaction signature. - stores/utxo/aerospike/spend.go: merged fmt/strings imports (PR) with runtime/debug (main). - stores/utxo/aerospike/send_store_batch_test.go + stores/utxo/aerospike/pruner/pruned_set_skip_test.go: retargeted from upstream aerospike client to the BSV fork for consistency with the rest of the PR's import switch. Review-comment fixes applied on top of the merge: - stores/utxo/aerospike/native_op.go: executeTeranodeOp now reuses the caller's WritePolicy on the native path instead of synthesising a fresh one, matching the UDF path. Also clarified the probe comment: the probe sends a *valid* setLocked sub-op via TeranodeModifyOp and treats PARAMETER_ERROR as 'unsupported'. - stores/utxo/aerospike/aerospike.go: PreserveTransactions now builds batchRecords via append (with a parallel recordTxIDs slice) so a failed NewKey can never leave a nil entry that BatchOperate would nil-deref on.
6 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.



Summary
The txmeta Kafka handler was spawning an unbounded goroutine per message, which could exhaust memory under sustained load. This PR replaces that pattern with a 256-shard bounded worker pool and fixes the invalid-subtree producer lifecycle.
Changes
txmeta handler (
txmetaHandler.go)go func()goroutine-per-message patterntxmetaWorkerShardCount) keyed byhash[0], preserving per-key ordering while allowing parallelism across different keystxmetaWorkerQueueSize = 256); a full queue returns an error so the Kafka consumer can apply backpressure instead of silently dropping worksync.Onceand cleanly shut down inStop()Invalid subtree producer (
Server.go)invalidSubtreeProducerChannel) soStartandPublishalways share the same channel lifecycleStop()now explicitly callsinvalidSubtreeKafkaProducer.Stop()to drain inflight messages on shutdownTests
TestServer_txmetaHandler_PreservesPerKeyOrdering— verifies ADD then DELETE on the same key are processed in orderTestServer_txmetaHandler_ReturnsErrorWhenQueueFull— verifies backpressure error is returned when shard queue is fullTestPublishInvalidSubtree_EndToEndMemoryKafka— end-to-end round-trip through the in-memory Kafka broker verifying message delivery