perf(propagation): process /txs batch concurrently with ordered errors#879
Conversation
handleMultipleTx fans tx processing across goroutines gated by the existing batchWorkerPool semaphore (shared with ProcessTransactionBatch), instead of the prior single consumer goroutine that processed the channel serially. Errors are written into per-submission slots and read after wg.Wait, so the response error list preserves the caller's submission order regardless of which worker finishes first. Worker panics are recovered and reported in the same per-tx slot. Honours the existing batch contract: callers must not mix a parent and its child in the same batch. The Server_test chain-based test was a contract violation that happened to pass under the prior serial behaviour; it is replaced with a sibling-based batch plus a dedicated order-preservation test. MockTxStore is given the mutex it always needed for parallel writers.
|
🤖 Claude Code Review Status: Complete Critical Issue Found: Semaphore leak on context cancellation (Server.go:798-807) When a request context is cancelled after acquiring a semaphore slot from Root cause: Line 800 acquires Impact: Under load with frequent client disconnections, the worker pool gradually degrades until all slots are leaked, blocking all future batch requests indefinitely. Suggested fix: Release the semaphore immediately when context cancellation is detected, before breaking: case <-ctx.Done():
<-ps.batchWorkerPool // Release the slot we just acquired
errSlots[nextSlot] = errors.WrapPublic(ctx.Err())
nextSlot++
earlyExitMsg = "request context cancelled"
cancelled = truePositive observations:
|
There was a problem hiding this comment.
Pull request overview
This PR updates the HTTP /txs batch ingestion path in services/propagation to actually process transactions concurrently (using the existing server-wide batch semaphore) while preserving deterministic, submission-ordered error reporting. It also adjusts tests to reflect the documented “no parent+child in the same batch” contract and fixes a race in the HTTP test mock tx store exposed by parallelism.
Changes:
- Make
handleMultipleTxfan out per-tx processing across goroutines gated bybatchWorkerPool, and aggregate errors in submission order via preallocated slots. - Replace the prior chain-based batch test with a sibling-based batch test and add an error-order-preservation test.
- Add mutex protection to
MockTxStore’s internaltxIDsslice to make the mock concurrency-safe.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| services/propagation/Server.go | Concurrent /txs processing with ordered error aggregation and semaphore gating. |
| services/propagation/Server_test.go | Update batch test to siblings + add ordered-error test for concurrent handling. |
| services/propagation/http_handlers_test.go | Make MockTxStore concurrency-safe with a mutex around txIDs. |
Comments suppressed due to low confidence (1)
services/propagation/Server.go:786
maxDataPerRequestis enforced viatotalBytesRead, but on parse errors youcontinuewithout addingbytesRead. Iftx.ReadFromconsumes bytes before failing (common for malformed/partial txs), a client can exceed the intended 32MB limit whiletotalBytesReadremains low. Consider incrementingtotalBytesReadbybytesReadeven on non-EOF errors (and/or re-checking the limit after each read) so invalid submissions can't bypass the data cap.
if err != nil {
// End of stream is expected and not an error
if err == io.EOF {
break
}
// Record the parse error in submission order.
errSlots[nextSlot] = err
nextSlot++
// if the error came from panic recovery, the stream is likely corrupted
if terr, ok := err.(*errors.Error); ok && terr.Code() == errors.ERR_PROCESSING {
ps.logger.Errorf("Stream corrupted after panic, stopping transaction processing")
break
}
// skip counters and reading this tx if a non-EOF error occurred
continue
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-05-15 15:47 UTC |
oskarszoon
left a comment
There was a problem hiding this comment.
Verified the concurrency aspects. claude[bot]'s CRITICAL semaphore-leak claim does not hold — the select has two mutually exclusive arms (pool-acquire + spawn-goroutine with defer release, vs ctx.Done + break). A slot cannot be acquired and then abandoned. The bot appears to have misread it as sequential.
errSlotsindex aliasing: each goroutine writes to its own index;wg.Waitestablishes happens-before for the assembly walkMockTxStoremutex coverstxIDsappend,Exists,Size— race-cleanwg.Waitordering: slots indexed by submission position; assembly walks in order after WaitmaxSubmissions = maxTransactionsPerRequest + 1: the +1 is correct — ctx-cancel error is written at slotnextSlot == maxTransactionsPerRequest; without +1 that's OOB- Parent+child caller contract: acceptable to let the violation surface as missing-parent errors; enforcing in-batch ordering would require a dependency graph pass
go test -race -count=10 on order-preserved + parallel tests: 90 pass, race-clean. vet / golangci-lint clean.
Test coverage gaps worth following up (non-blocking)
- No HTTP-path ctx-cancellation mid-batch test. The exact test that would have refuted (or confirmed) the bot's claim is missing — adding one pins the no-leak guarantee in a regression test.
- No HTTP-path test with non-nil
batchWorkerPool(only the gRPC path hasTestProcessTransactionBatch_BatchConcurrencyLimit). - No worker-panic mid-batch test — the panic recovery path in the worker goroutine is untested.
None of these are blockers; they'd close the coverage story.



Summary
handleMultipleTxfans transaction processing across goroutines gated by the existingbatchWorkerPoolsemaphore (the samepropagation_batchConcurrencyLimitknob thatProcessTransactionBatchalready uses), replacing the prior single consumer goroutine that defeated the channel-based parallel facade.wg.Wait, so the response error list preserves the caller's submission order regardless of which worker finishes first. Worker panics are recovered and reported in the offending tx's own slot.Notable details
Test_handleMultipleTxwas itself a contract violation that only passed because the old handler ran serially. It is replaced with a sibling-based test plus a newTest_handleMultipleTx_ErrorOrderPreservedthat submits 32 siblings all failing validation and asserts the txids appear in the response body in submission order.MockTxStorewas always racy ontxIDs; parallel batch handling exposed it, so it now has the mutex it needed.maxSubmissions = maxTransactionsPerRequest + 1bounds the pre-allocated slot slice so producer-side parse-error accumulation cannot grow the slice (which would race on the header against in-flight worker writes).propagation_batchConcurrencyLimitdefault of 0 (unlimited) is preserved; operators who already tuned it for gRPC now get HTTP-side admission control with no extra config.Test plan
go vet ./services/propagation/...golangci-lint run ./services/propagation/...go test -race -tags testtxmetacache -count=1 -skip 'Aerospike|Postgres' ./services/propagation/...make smoketest) on a deployment exercising/txs