Skip to content

perf(propagation): process /txs batch concurrently with ordered errors#879

Merged
icellan merged 1 commit into
mainfrom
worktree-propagation-parallel-batch
May 18, 2026
Merged

perf(propagation): process /txs batch concurrently with ordered errors#879
icellan merged 1 commit into
mainfrom
worktree-propagation-parallel-batch

Conversation

@icellan

@icellan icellan commented May 15, 2026

Copy link
Copy Markdown
Contributor

Summary

  • handleMultipleTx fans transaction processing across goroutines gated by the existing batchWorkerPool semaphore (the same propagation_batchConcurrencyLimit knob that ProcessTransactionBatch already uses), replacing the prior single consumer goroutine that defeated the channel-based parallel facade.
  • Errors are written into pre-allocated per-submission slots and drained after wg.Wait, so the response error list preserves the caller's submission order regardless of which worker finishes first. Worker panics are recovered and reported in the offending tx's own slot.
  • Honours the existing caller contract: a batch must not contain both a parent and any of its children. The server still does not enforce this; parallel processing only makes a contract violation more likely to surface (as missing-parent errors) than the prior serial timing did.

Notable details

  • The previous chain-based Test_handleMultipleTx was itself a contract violation that only passed because the old handler ran serially. It is replaced with a sibling-based test plus a new Test_handleMultipleTx_ErrorOrderPreserved that submits 32 siblings all failing validation and asserts the txids appear in the response body in submission order.
  • MockTxStore was always racy on txIDs; parallel batch handling exposed it, so it now has the mutex it needed.
  • maxSubmissions = maxTransactionsPerRequest + 1 bounds the pre-allocated slot slice so producer-side parse-error accumulation cannot grow the slice (which would race on the header against in-flight worker writes).
  • propagation_batchConcurrencyLimit default of 0 (unlimited) is preserved; operators who already tuned it for gRPC now get HTTP-side admission control with no extra config.

Test plan

  • go vet ./services/propagation/...
  • golangci-lint run ./services/propagation/...
  • go test -race -tags testtxmetacache -count=1 -skip 'Aerospike|Postgres' ./services/propagation/...
  • CI race + lint pass on the PR
  • Smoke test (make smoketest) on a deployment exercising /txs

handleMultipleTx fans tx processing across goroutines gated by the existing
batchWorkerPool semaphore (shared with ProcessTransactionBatch), instead of
the prior single consumer goroutine that processed the channel serially.

Errors are written into per-submission slots and read after wg.Wait, so the
response error list preserves the caller's submission order regardless of
which worker finishes first. Worker panics are recovered and reported in the
same per-tx slot. Honours the existing batch contract: callers must not mix
a parent and its child in the same batch.

The Server_test chain-based test was a contract violation that happened to
pass under the prior serial behaviour; it is replaced with a sibling-based
batch plus a dedicated order-preservation test. MockTxStore is given the
mutex it always needed for parallel writers.
@github-actions

github-actions Bot commented May 15, 2026

Copy link
Copy Markdown
Contributor

🤖 Claude Code Review

Status: Complete


Critical Issue Found:

Semaphore leak on context cancellation (Server.go:798-807)

When a request context is cancelled after acquiring a semaphore slot from batchWorkerPool, the semaphore is never released. This permanently reduces available worker pool capacity by one slot per cancelled request, eventually exhausting the pool.

Root cause: Line 800 acquires ps.batchWorkerPool <- struct{}{}, but when ctx.Done() fires (line 801), the code sets cancelled = true and breaks the loop without releasing the semaphore. The release only happens in processOne defer (line 714), which is never called because no goroutine is spawned.

Impact: Under load with frequent client disconnections, the worker pool gradually degrades until all slots are leaked, blocking all future batch requests indefinitely.

Suggested fix: Release the semaphore immediately when context cancellation is detected, before breaking:

case <-ctx.Done():
    <-ps.batchWorkerPool  // Release the slot we just acquired
    errSlots[nextSlot] = errors.WrapPublic(ctx.Err())
    nextSlot++
    earlyExitMsg = "request context cancelled"
    cancelled = true

Positive observations:

  • Comprehensive test coverage with Test_handleMultipleTx_ErrorOrderPreserved validating concurrent error ordering
  • Fixed race condition in MockTxStore by adding proper mutex protection
  • Panic recovery correctly captures and reports errors in pre-allocated slots
  • Clear documentation of caller contract (no parent-child in same batch)

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the HTTP /txs batch ingestion path in services/propagation to actually process transactions concurrently (using the existing server-wide batch semaphore) while preserving deterministic, submission-ordered error reporting. It also adjusts tests to reflect the documented “no parent+child in the same batch” contract and fixes a race in the HTTP test mock tx store exposed by parallelism.

Changes:

  • Make handleMultipleTx fan out per-tx processing across goroutines gated by batchWorkerPool, and aggregate errors in submission order via preallocated slots.
  • Replace the prior chain-based batch test with a sibling-based batch test and add an error-order-preservation test.
  • Add mutex protection to MockTxStore’s internal txIDs slice to make the mock concurrency-safe.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
services/propagation/Server.go Concurrent /txs processing with ordered error aggregation and semaphore gating.
services/propagation/Server_test.go Update batch test to siblings + add ordered-error test for concurrent handling.
services/propagation/http_handlers_test.go Make MockTxStore concurrency-safe with a mutex around txIDs.
Comments suppressed due to low confidence (1)

services/propagation/Server.go:786

  • maxDataPerRequest is enforced via totalBytesRead, but on parse errors you continue without adding bytesRead. If tx.ReadFrom consumes bytes before failing (common for malformed/partial txs), a client can exceed the intended 32MB limit while totalBytesRead remains low. Consider incrementing totalBytesRead by bytesRead even on non-EOF errors (and/or re-checking the limit after each read) so invalid submissions can't bypass the data cap.
			if err != nil {
				// End of stream is expected and not an error
				if err == io.EOF {
					break
				}

				// Record the parse error in submission order.
				errSlots[nextSlot] = err
				nextSlot++

				// if the error came from panic recovery, the stream is likely corrupted
				if terr, ok := err.(*errors.Error); ok && terr.Code() == errors.ERR_PROCESSING {
					ps.logger.Errorf("Stream corrupted after panic, stopping transaction processing")
					break
				}

				// skip counters and reading this tx if a non-EOF error occurred
				continue
			}

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sonarqubecloud

Copy link
Copy Markdown

@github-actions

Copy link
Copy Markdown
Contributor

Benchmark Comparison Report

Baseline: main (unknown)

Current: PR-879 (330e9db)

Summary

  • Regressions: 0
  • Improvements: 0
  • Unchanged: 142
  • Significance level: p < 0.05
All benchmark results (sec/op)
Benchmark Baseline Current Change p-value
_NewBlockFromBytes-4 1.565µ 1.568µ ~ 0.700
SplitSyncedParentMap_SetIfNotExists/256_buckets-4 71.12n 71.26n ~ 0.400
SplitSyncedParentMap_SetIfNotExists/16_buckets-4 71.17n 71.26n ~ 1.000
SplitSyncedParentMap_SetIfNotExists/1_bucket-4 71.09n 71.11n ~ 1.000
SplitSyncedParentMap_ConcurrentSetIfNotExists/256_buckets... 32.62n 31.99n ~ 0.100
SplitSyncedParentMap_ConcurrentSetIfNotExists/16_buckets_... 53.06n 53.82n ~ 0.700
SplitSyncedParentMap_ConcurrentSetIfNotExists/1_bucket_pa... 127.2n 129.3n ~ 0.400
MiningCandidate_Stringify_Short-4 225.7n 223.8n ~ 0.100
MiningCandidate_Stringify_Long-4 1.622µ 1.617µ ~ 1.000
MiningSolution_Stringify-4 847.9n 843.1n ~ 1.000
BlockInfo_MarshalJSON-4 1.720µ 1.727µ ~ 0.400
NewFromBytes-4 127.7n 126.6n ~ 0.700
Mine_EasyDifficulty-4 66.94µ 67.85µ ~ 0.100
Mine_WithAddress-4 7.026µ 7.038µ ~ 1.000
BlockAssembler_AddTx-4 0.02826n 0.03346n ~ 0.400
AddNode-4 10.53 10.31 ~ 0.700
AddNodeWithMap-4 10.85 11.06 ~ 0.700
DirectSubtreeAdd/4_per_subtree-4 61.68n 60.19n ~ 1.000
DirectSubtreeAdd/64_per_subtree-4 28.22n 28.52n ~ 0.100
DirectSubtreeAdd/256_per_subtree-4 26.87n 27.19n ~ 0.100
DirectSubtreeAdd/1024_per_subtree-4 25.90n 26.17n ~ 0.100
DirectSubtreeAdd/2048_per_subtree-4 25.55n 25.82n ~ 0.300
SubtreeProcessorAdd/4_per_subtree-4 283.6n 281.4n ~ 0.500
SubtreeProcessorAdd/64_per_subtree-4 275.6n 273.5n ~ 0.500
SubtreeProcessorAdd/256_per_subtree-4 277.9n 277.7n ~ 0.700
SubtreeProcessorAdd/1024_per_subtree-4 269.2n 269.0n ~ 1.000
SubtreeProcessorAdd/2048_per_subtree-4 269.1n 270.5n ~ 0.100
SubtreeProcessorRotate/4_per_subtree-4 274.7n 275.9n ~ 0.400
SubtreeProcessorRotate/64_per_subtree-4 273.9n 274.8n ~ 0.700
SubtreeProcessorRotate/256_per_subtree-4 273.4n 272.5n ~ 1.000
SubtreeProcessorRotate/1024_per_subtree-4 274.3n 279.0n ~ 0.100
SubtreeNodeAddOnly/4_per_subtree-4 54.32n 53.59n ~ 0.100
SubtreeNodeAddOnly/64_per_subtree-4 34.37n 34.21n ~ 0.700
SubtreeNodeAddOnly/256_per_subtree-4 33.32n 33.24n ~ 0.700
SubtreeNodeAddOnly/1024_per_subtree-4 32.77n 32.56n ~ 0.100
SubtreeCreationOnly/4_per_subtree-4 113.6n 112.5n ~ 0.100
SubtreeCreationOnly/64_per_subtree-4 397.9n 389.9n ~ 0.200
SubtreeCreationOnly/256_per_subtree-4 1.327µ 1.309µ ~ 0.100
SubtreeCreationOnly/1024_per_subtree-4 4.362µ 4.351µ ~ 0.800
SubtreeCreationOnly/2048_per_subtree-4 8.033µ 7.869µ ~ 0.100
SubtreeProcessorOverheadBreakdown/64_per_subtree-4 272.0n 271.3n ~ 0.700
SubtreeProcessorOverheadBreakdown/1024_per_subtree-4 272.6n 270.8n ~ 0.100
ParallelGetAndSetIfNotExists/1k_nodes-4 573.3µ 792.6µ ~ 0.100
ParallelGetAndSetIfNotExists/10k_nodes-4 1.325m 1.595m ~ 0.100
ParallelGetAndSetIfNotExists/50k_nodes-4 6.693m 6.813m ~ 0.100
ParallelGetAndSetIfNotExists/100k_nodes-4 13.46m 13.55m ~ 1.000
SequentialGetAndSetIfNotExists/1k_nodes-4 651.3µ 667.9µ ~ 0.400
SequentialGetAndSetIfNotExists/10k_nodes-4 2.767m 2.810m ~ 0.700
SequentialGetAndSetIfNotExists/50k_nodes-4 10.44m 10.35m ~ 0.100
SequentialGetAndSetIfNotExists/100k_nodes-4 20.03m 19.86m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/1k_nodes-4 632.1µ 827.6µ ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/10k_nodes-4 4.166m 4.344m ~ 0.400
ProcessOwnBlockSubtreeNodesParallel/100k_nodes-4 16.65m 16.38m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/1k_nodes-4 690.5µ 676.7µ ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/10k_nodes-4 5.621m 5.683m ~ 0.700
ProcessOwnBlockSubtreeNodesSequential/100k_nodes-4 37.68m 37.83m ~ 0.700
DiskTxMap_SetIfNotExists-4 3.755µ 3.719µ ~ 0.400
DiskTxMap_SetIfNotExists_Parallel-4 3.485µ 3.568µ ~ 0.700
DiskTxMap_ExistenceOnly-4 339.4n 323.8n ~ 0.100
Queue-4 191.7n 187.4n ~ 0.100
AtomicPointer-4 4.551n 4.767n ~ 0.100
ReorgOptimizations/DedupFilterPipeline/Old/10K-4 871.5µ 850.2µ ~ 0.200
ReorgOptimizations/DedupFilterPipeline/New/10K-4 813.8µ 792.9µ ~ 0.100
ReorgOptimizations/AllMarkFalse/Old/10K-4 108.7µ 104.5µ ~ 0.100
ReorgOptimizations/AllMarkFalse/New/10K-4 62.68µ 62.48µ ~ 0.400
ReorgOptimizations/HashSlicePool/Old/10K-4 57.64µ 55.31µ ~ 0.100
ReorgOptimizations/HashSlicePool/New/10K-4 11.78µ 12.20µ ~ 0.700
ReorgOptimizations/NodeFlags/Old/10K-4 4.987µ 4.620µ ~ 0.100
ReorgOptimizations/NodeFlags/New/10K-4 1.700µ 1.561µ ~ 0.100
ReorgOptimizations/DedupFilterPipeline/Old/100K-4 10.573m 9.613m ~ 0.100
ReorgOptimizations/DedupFilterPipeline/New/100K-4 10.99m 10.01m ~ 0.100
ReorgOptimizations/AllMarkFalse/Old/100K-4 1.141m 1.091m ~ 0.100
ReorgOptimizations/AllMarkFalse/New/100K-4 691.0µ 692.3µ ~ 0.700
ReorgOptimizations/HashSlicePool/Old/100K-4 544.6µ 549.7µ ~ 1.000
ReorgOptimizations/HashSlicePool/New/100K-4 341.1µ 338.1µ ~ 0.400
ReorgOptimizations/NodeFlags/Old/100K-4 51.07µ 48.81µ ~ 0.100
ReorgOptimizations/NodeFlags/New/100K-4 18.11µ 17.14µ ~ 0.100
TxMapSetIfNotExists-4 52.01n 51.49n ~ 0.100
TxMapSetIfNotExistsDuplicate-4 38.52n 38.30n ~ 0.500
ChannelSendReceive-4 610.1n 621.3n ~ 0.400
CalcBlockWork-4 471.2n 466.8n ~ 0.100
CalculateWork-4 629.1n 619.2n ~ 0.700
BuildBlockLocatorString_Helpers/Size_10-4 1.295µ 1.300µ ~ 0.300
BuildBlockLocatorString_Helpers/Size_100-4 12.61µ 12.37µ ~ 0.400
BuildBlockLocatorString_Helpers/Size_1000-4 122.2µ 123.4µ ~ 0.700
CatchupWithHeaderCache-4 104.3m 104.4m ~ 0.400
_BufferPoolAllocation/16KB-4 3.503µ 4.119µ ~ 0.700
_BufferPoolAllocation/32KB-4 9.177µ 7.125µ ~ 0.100
_BufferPoolAllocation/64KB-4 14.92µ 13.45µ ~ 0.100
_BufferPoolAllocation/128KB-4 27.58µ 25.34µ ~ 0.400
_BufferPoolAllocation/512KB-4 97.11µ 93.40µ ~ 1.000
_BufferPoolConcurrent/32KB-4 17.11µ 16.71µ ~ 1.000
_BufferPoolConcurrent/64KB-4 26.39µ 26.56µ ~ 1.000
_BufferPoolConcurrent/512KB-4 134.9µ 138.2µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/16KB-4 610.2µ 631.6µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/32KB-4 614.3µ 627.1µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/64KB-4 609.4µ 633.5µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/128KB-4 610.6µ 635.7µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/512KB-4 623.0µ 656.8µ ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/16KB-4 37.26m 37.39m ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/32KB-4 37.29m 37.11m ~ 0.700
_SubtreeDataDeserializationWithBufferSizes/64KB-4 37.07m 37.24m ~ 0.700
_SubtreeDataDeserializationWithBufferSizes/128KB-4 37.41m 36.74m ~ 0.200
_SubtreeDataDeserializationWithBufferSizes/512KB-4 37.21m 36.83m ~ 0.100
_PooledVsNonPooled/Pooled-4 773.2n 772.1n ~ 1.000
_PooledVsNonPooled/NonPooled-4 6.414µ 7.132µ ~ 0.200
_MemoryFootprint/Current_512KB_32concurrent-4 6.582µ 6.718µ ~ 0.700
_MemoryFootprint/Proposed_32KB_32concurrent-4 9.798µ 9.281µ ~ 0.100
_MemoryFootprint/Alternative_64KB_32concurrent-4 8.818µ 8.852µ ~ 0.700
_prepareTxsPerLevel-4 415.2m 412.3m ~ 1.000
_prepareTxsPerLevelOrdered-4 3.786m 3.684m ~ 0.700
_prepareTxsPerLevel_Comparison/Original-4 421.2m 423.8m ~ 0.400
_prepareTxsPerLevel_Comparison/Optimized-4 3.741m 3.732m ~ 1.000
SubtreeSizes/10k_tx_4_per_subtree-4 1.326m 1.305m ~ 0.700
SubtreeSizes/10k_tx_16_per_subtree-4 310.5µ 307.5µ ~ 1.000
SubtreeSizes/10k_tx_64_per_subtree-4 73.96µ 73.71µ ~ 1.000
SubtreeSizes/10k_tx_256_per_subtree-4 18.50µ 18.29µ ~ 0.100
SubtreeSizes/10k_tx_512_per_subtree-4 9.209µ 9.098µ ~ 0.100
SubtreeSizes/10k_tx_1024_per_subtree-4 4.511µ 4.522µ ~ 1.000
SubtreeSizes/10k_tx_2k_per_subtree-4 2.283µ 2.264µ ~ 0.400
BlockSizeScaling/10k_tx_64_per_subtree-4 72.73µ 72.17µ ~ 0.700
BlockSizeScaling/10k_tx_256_per_subtree-4 18.42µ 18.24µ ~ 0.200
BlockSizeScaling/10k_tx_1024_per_subtree-4 4.582µ 4.539µ ~ 0.100
BlockSizeScaling/50k_tx_64_per_subtree-4 383.3µ 383.9µ ~ 1.000
BlockSizeScaling/50k_tx_256_per_subtree-4 92.91µ 93.30µ ~ 1.000
BlockSizeScaling/50k_tx_1024_per_subtree-4 22.86µ 22.70µ ~ 0.700
SubtreeAllocations/small_subtrees_exists_check-4 161.0µ 161.6µ ~ 1.000
SubtreeAllocations/small_subtrees_data_fetch-4 159.9µ 160.7µ ~ 0.700
SubtreeAllocations/small_subtrees_full_validation-4 319.3µ 319.4µ ~ 0.700
SubtreeAllocations/medium_subtrees_exists_check-4 9.225µ 9.360µ ~ 0.600
SubtreeAllocations/medium_subtrees_data_fetch-4 9.388µ 9.369µ ~ 1.000
SubtreeAllocations/medium_subtrees_full_validation-4 18.75µ 18.82µ ~ 0.700
SubtreeAllocations/large_subtrees_exists_check-4 2.236µ 2.230µ ~ 0.200
SubtreeAllocations/large_subtrees_data_fetch-4 2.271µ 2.276µ ~ 0.100
SubtreeAllocations/large_subtrees_full_validation-4 4.660µ 4.673µ ~ 0.500
StoreBlock_Sequential/BelowCSVHeight-4 319.5µ 309.6µ ~ 0.400
StoreBlock_Sequential/AboveCSVHeight-4 313.9µ 312.6µ ~ 1.000
GetUtxoHashes-4 259.1n 260.3n ~ 1.000
GetUtxoHashes_ManyOutputs-4 42.54µ 42.88µ ~ 0.100
_NewMetaDataFromBytes-4 237.8n 237.1n ~ 0.400
_Bytes-4 617.4n 620.1n ~ 0.700
_MetaBytes-4 555.8n 562.4n ~ 0.100

Threshold: >10% with p < 0.05 | Generated: 2026-05-15 15:47 UTC

@oskarszoon oskarszoon left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified the concurrency aspects. claude[bot]'s CRITICAL semaphore-leak claim does not hold — the select has two mutually exclusive arms (pool-acquire + spawn-goroutine with defer release, vs ctx.Done + break). A slot cannot be acquired and then abandoned. The bot appears to have misread it as sequential.

  • errSlots index aliasing: each goroutine writes to its own index; wg.Wait establishes happens-before for the assembly walk
  • MockTxStore mutex covers txIDs append, Exists, Size — race-clean
  • wg.Wait ordering: slots indexed by submission position; assembly walks in order after Wait
  • maxSubmissions = maxTransactionsPerRequest + 1: the +1 is correct — ctx-cancel error is written at slot nextSlot == maxTransactionsPerRequest; without +1 that's OOB
  • Parent+child caller contract: acceptable to let the violation surface as missing-parent errors; enforcing in-batch ordering would require a dependency graph pass

go test -race -count=10 on order-preserved + parallel tests: 90 pass, race-clean. vet / golangci-lint clean.

Test coverage gaps worth following up (non-blocking)

  1. No HTTP-path ctx-cancellation mid-batch test. The exact test that would have refuted (or confirmed) the bot's claim is missing — adding one pins the no-leak guarantee in a regression test.
  2. No HTTP-path test with non-nil batchWorkerPool (only the gRPC path has TestProcessTransactionBatch_BatchConcurrencyLimit).
  3. No worker-panic mid-batch test — the panic recovery path in the worker goroutine is untested.

None of these are blockers; they'd close the coverage story.

@icellan icellan merged commit cfcb9ca into main May 18, 2026
34 checks passed
@icellan icellan deleted the worktree-propagation-parallel-batch branch May 18, 2026 09:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants