Skip to content

fix(propagation): correct HTTP status mapping for missing-parent and duplicate-tx#870

Merged
icellan merged 2 commits into
bsv-blockchain:mainfrom
shruggr:fix/audit-tx-error-status-codes
May 15, 2026
Merged

fix(propagation): correct HTTP status mapping for missing-parent and duplicate-tx#870
icellan merged 2 commits into
bsv-blockchain:mainfrom
shruggr:fix/audit-tx-error-status-codes

Conversation

@shruggr

@shruggr shruggr commented May 14, 2026

Copy link
Copy Markdown
Contributor

Summary

Audit of httpStatusForTxError in services/propagation/Server.go. Two transaction-related errors were falling through to the default 500 case, conflating validation conditions with infrastructure failures and causing downstream consumers (e.g., Arcade) to retry txs that should be terminal or accepted.

  • ErrTxMissingParent422 Unprocessable Entity. The tx is structurally valid but its parent isn't in the UTXO store yet. A client-actionable validation-time condition, distinct from 400 (malformed) and 409 (conflict).
  • ErrTxExists200 OK. Fires when the UTXO store reports the txid is already committed. Semantically the resource is already in the desired state, so this is idempotent success, not a conflict. The "creation in progress" edge case at aerospike/create.go#L1078 is handled by the validator's CreateInUtxoStore path before it reaches the HTTP boundary.

Other errors deliberately left at 500 because they indicate genuine infra / data-integrity failures: ErrUtxoHashMismatch, ErrUtxoError, ErrStorageError, ErrServiceError, generic ErrProcessing, ErrTxNotFound (translated upstream to ErrTxMissingParent).

Test plan

  • go test -count=1 ./services/propagation/... — pass
  • go vet ./services/propagation/... — clean
  • golangci-lint run ./services/propagation/... — 0 issues
  • Added table cases in TestHandleSingleTx for both new mappings, including a NewProcessingError wrapper case to verify error-chain walking

shruggr added 2 commits May 14, 2026 16:55
…duplicate-tx

httpStatusForTxError previously returned 500 for both ErrTxMissingParent
and ErrTxExists, causing downstream clients to retry validation-time
conditions as if they were infrastructure failures.

- ErrTxMissingParent -> 422 Unprocessable Entity (tx valid but parent
  not yet available; client can resubmit after parent lands)
- ErrTxExists -> 200 OK (UTXO store reports the tx is already accepted;
  duplicate submission of a known-good tx is idempotent success, not
  a conflict)

Adds handler tests covering both new mappings plus the wrapped chain
case for ErrTxMissingParent.
ErrTxExists maps to 200 OK but the response body was still using the
"Failed to process transaction" prefix. Use "OK" body for 2xx returns.
@shruggr shruggr marked this pull request as ready for review May 14, 2026 21:09
shruggr added a commit to bsv-blockchain/arcade that referenced this pull request May 14, 2026
Introduces the single-goroutine dispatcher described in the plan. The
dispatcher reads transactions from an incoming channel, decides whether
to admit each to a pending broadcast batch or hold it as a waiter on
in-flight parents, and emits flushed batches to a downstream worker
channel. Status flips coming back (from broadcast workers and the
merkle-service callback path) drive waiter release, recursive
cascade-reject, retry queueing, and offset terminalization.

Components:

  - dispatcher.go: types (dispatcherMsg, inFlightEntry, statusFlip),
    the Dispatcher struct with its dep-index/retry-queue state, the
    main select loop, and handlers for incoming messages, status
    flips, and timer-driven batch flushes / retry wake-ups.

  - offset_tracker.go: a min-heap with lazy deletion behind the
    offsetTracker interface. Add records an in-flight offset, Done
    marks one terminalized, LowestUnfinished gives the commit point.

  - dispatcher_test.go: unit tests covering the baseline admit path,
    parent-in-flight hold, cascade-reject for REJECTED parents, the
    multi-parent case, retryable failure requeue with backoff, and
    retry exhaustion. Plus an OffsetTracker test for the lazy-delete
    semantics.

Not wired up to anything yet — the dispatcher stands alone. The next
commit will connect it to a Kafka consumer feeding incomingMsgs and a
broadcast-worker pool consuming outgoingBatch. Existing propagator,
tx_validator, and reaper code paths are untouched.

Retry classification uses HTTP status codes (422 for ErrTxMissingParent,
0 for no-response) rather than the existing IsRetryableError text match
— the latter is removed in a follow-up commit alongside the wiring
change. The dependency on Teranode returning 422 is satisfied by
bsv-blockchain/teranode#870.

Verified:
  - go test -count=1 -race ./services/propagation/... -- pass
  - go vet ./services/propagation/... -- clean
  - golangci-lint run ./services/propagation/ -- 0 issues
@github-actions

github-actions Bot commented May 15, 2026

Copy link
Copy Markdown
Contributor

🤖 Claude Code Review

Status: Complete


Review Summary:
This PR correctly fixes HTTP status code mapping for two transaction errors. The changes are well-reasoned, properly tested, and align with REST semantics.

Code Quality: No issues found. The implementation correctly uses errors.Is() for error chain walking, includes comprehensive test coverage, and follows the existing error handling patterns.

Verification:

  • Logic verified against actual ErrTxExists usage in utxo stores (aerospike/create.go:1078, sql/sql.go:460)
  • Validator gracefully handles ErrTxExists at services/validator/Validator.go:708-713
  • Test coverage includes both direct and wrapped error cases
  • PR description correctly references the validator's handling of "creation in progress" edge case

No documentation updates needed: The HTTP endpoints are internal/alternative submission paths. User-facing docs focus on gRPC API, which is appropriate.

@sonarqubecloud

Copy link
Copy Markdown

@github-actions

Copy link
Copy Markdown
Contributor

Benchmark Comparison Report

Baseline: main (unknown)

Current: PR-870 (403cdde)

Summary

  • Regressions: 0
  • Improvements: 0
  • Unchanged: 142
  • Significance level: p < 0.05
All benchmark results (sec/op)
Benchmark Baseline Current Change p-value
_NewBlockFromBytes-4 1.701µ 1.686µ ~ 0.500
SplitSyncedParentMap_SetIfNotExists/256_buckets-4 61.65n 61.67n ~ 1.000
SplitSyncedParentMap_SetIfNotExists/16_buckets-4 61.63n 61.82n ~ 0.700
SplitSyncedParentMap_SetIfNotExists/1_bucket-4 61.51n 61.77n ~ 0.400
SplitSyncedParentMap_ConcurrentSetIfNotExists/256_buckets... 29.91n 29.90n ~ 1.000
SplitSyncedParentMap_ConcurrentSetIfNotExists/16_buckets_... 51.64n 51.03n ~ 0.100
SplitSyncedParentMap_ConcurrentSetIfNotExists/1_bucket_pa... 111.0n 110.0n ~ 1.000
MiningCandidate_Stringify_Short-4 264.5n 262.8n ~ 0.200
MiningCandidate_Stringify_Long-4 1.915µ 1.913µ ~ 0.700
MiningSolution_Stringify-4 993.5n 989.4n ~ 0.100
BlockInfo_MarshalJSON-4 1.805µ 1.797µ ~ 0.900
NewFromBytes-4 127.8n 127.7n ~ 1.000
Mine_EasyDifficulty-4 60.63µ 60.82µ ~ 0.400
Mine_WithAddress-4 7.094µ 6.771µ ~ 0.200
DiskTxMap_SetIfNotExists-4 3.546µ 3.414µ ~ 0.700
DiskTxMap_SetIfNotExists_Parallel-4 3.245µ 3.293µ ~ 1.000
DiskTxMap_ExistenceOnly-4 288.5n 287.8n ~ 0.400
Queue-4 186.3n 187.1n ~ 0.200
AtomicPointer-4 4.903n 4.712n ~ 0.400
ReorgOptimizations/DedupFilterPipeline/Old/10K-4 827.7µ 844.7µ ~ 0.700
ReorgOptimizations/DedupFilterPipeline/New/10K-4 781.5µ 785.7µ ~ 0.400
ReorgOptimizations/AllMarkFalse/Old/10K-4 100.3µ 102.3µ ~ 0.100
ReorgOptimizations/AllMarkFalse/New/10K-4 62.27µ 63.13µ ~ 0.100
ReorgOptimizations/HashSlicePool/Old/10K-4 51.03µ 53.09µ ~ 0.100
ReorgOptimizations/HashSlicePool/New/10K-4 11.43µ 11.16µ ~ 0.100
ReorgOptimizations/NodeFlags/Old/10K-4 4.391µ 4.744µ ~ 0.100
ReorgOptimizations/NodeFlags/New/10K-4 1.486µ 1.688µ ~ 0.100
ReorgOptimizations/DedupFilterPipeline/Old/100K-4 9.153m 9.261m ~ 0.400
ReorgOptimizations/DedupFilterPipeline/New/100K-4 9.461m 9.172m ~ 0.100
ReorgOptimizations/AllMarkFalse/Old/100K-4 1.042m 1.046m ~ 0.100
ReorgOptimizations/AllMarkFalse/New/100K-4 683.3µ 677.4µ ~ 0.700
ReorgOptimizations/HashSlicePool/Old/100K-4 496.5µ 533.5µ ~ 0.100
ReorgOptimizations/HashSlicePool/New/100K-4 312.5µ 284.0µ ~ 0.400
ReorgOptimizations/NodeFlags/Old/100K-4 44.43µ 45.48µ ~ 0.100
ReorgOptimizations/NodeFlags/New/100K-4 12.27µ 15.58µ ~ 0.100
TxMapSetIfNotExists-4 51.09n 51.09n ~ 1.000
TxMapSetIfNotExistsDuplicate-4 38.38n 38.22n ~ 0.800
ChannelSendReceive-4 619.9n 621.6n ~ 0.700
BlockAssembler_AddTx-4 0.03011n 0.02630n ~ 0.700
AddNode-4 11.37 11.20 ~ 0.400
AddNodeWithMap-4 10.52 11.21 ~ 0.200
DirectSubtreeAdd/4_per_subtree-4 62.85n 56.60n ~ 0.400
DirectSubtreeAdd/64_per_subtree-4 28.90n 29.08n ~ 0.400
DirectSubtreeAdd/256_per_subtree-4 27.62n 27.81n ~ 0.100
DirectSubtreeAdd/1024_per_subtree-4 26.43n 26.49n ~ 0.400
DirectSubtreeAdd/2048_per_subtree-4 26.04n 26.13n ~ 0.700
SubtreeProcessorAdd/4_per_subtree-4 286.0n 297.9n ~ 0.200
SubtreeProcessorAdd/64_per_subtree-4 278.9n 284.4n ~ 0.100
SubtreeProcessorAdd/256_per_subtree-4 280.0n 281.4n ~ 0.100
SubtreeProcessorAdd/1024_per_subtree-4 271.4n 274.6n ~ 0.100
SubtreeProcessorAdd/2048_per_subtree-4 276.9n 275.2n ~ 0.600
SubtreeProcessorRotate/4_per_subtree-4 276.9n 287.0n ~ 0.100
SubtreeProcessorRotate/64_per_subtree-4 273.0n 276.0n ~ 0.100
SubtreeProcessorRotate/256_per_subtree-4 276.5n 277.0n ~ 0.400
SubtreeProcessorRotate/1024_per_subtree-4 277.9n 279.8n ~ 0.400
SubtreeNodeAddOnly/4_per_subtree-4 54.87n 55.34n ~ 0.200
SubtreeNodeAddOnly/64_per_subtree-4 36.12n 36.11n ~ 1.000
SubtreeNodeAddOnly/256_per_subtree-4 35.02n 35.08n ~ 0.200
SubtreeNodeAddOnly/1024_per_subtree-4 34.45n 34.56n ~ 0.400
SubtreeCreationOnly/4_per_subtree-4 110.9n 111.3n ~ 0.800
SubtreeCreationOnly/64_per_subtree-4 362.2n 352.5n ~ 0.100
SubtreeCreationOnly/256_per_subtree-4 1.245µ 1.241µ ~ 1.000
SubtreeCreationOnly/1024_per_subtree-4 3.778µ 3.816µ ~ 0.300
SubtreeCreationOnly/2048_per_subtree-4 6.822µ 6.854µ ~ 0.600
SubtreeProcessorOverheadBreakdown/64_per_subtree-4 272.6n 274.5n ~ 0.400
SubtreeProcessorOverheadBreakdown/1024_per_subtree-4 271.4n 278.6n ~ 0.100
ParallelGetAndSetIfNotExists/1k_nodes-4 554.2µ 580.7µ ~ 0.100
ParallelGetAndSetIfNotExists/10k_nodes-4 1.347m 1.476m ~ 0.100
ParallelGetAndSetIfNotExists/50k_nodes-4 6.823m 6.943m ~ 0.100
ParallelGetAndSetIfNotExists/100k_nodes-4 13.58m 13.92m ~ 0.100
SequentialGetAndSetIfNotExists/1k_nodes-4 624.7µ 630.7µ ~ 0.200
SequentialGetAndSetIfNotExists/10k_nodes-4 2.872m 2.887m ~ 1.000
SequentialGetAndSetIfNotExists/50k_nodes-4 11.79m 11.02m ~ 0.100
SequentialGetAndSetIfNotExists/100k_nodes-4 22.37m 21.20m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/1k_nodes-4 600.0µ 623.4µ ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/10k_nodes-4 4.671m 4.711m ~ 0.700
ProcessOwnBlockSubtreeNodesParallel/100k_nodes-4 17.16m 17.53m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/1k_nodes-4 669.7µ 673.1µ ~ 0.400
ProcessOwnBlockSubtreeNodesSequential/10k_nodes-4 6.294m 6.605m ~ 0.100
ProcessOwnBlockSubtreeNodesSequential/100k_nodes-4 39.12m 41.27m ~ 0.100
CalcBlockWork-4 469.8n 475.4n ~ 0.700
CalculateWork-4 624.5n 626.6n ~ 0.100
BuildBlockLocatorString_Helpers/Size_10-4 1.271µ 1.571µ ~ 0.400
BuildBlockLocatorString_Helpers/Size_100-4 12.13µ 12.12µ ~ 1.000
BuildBlockLocatorString_Helpers/Size_1000-4 120.9µ 120.4µ ~ 1.000
CatchupWithHeaderCache-4 104.3m 104.1m ~ 0.700
_BufferPoolAllocation/16KB-4 3.528µ 4.011µ ~ 1.000
_BufferPoolAllocation/32KB-4 8.601µ 6.327µ ~ 0.100
_BufferPoolAllocation/64KB-4 16.95µ 16.73µ ~ 0.100
_BufferPoolAllocation/128KB-4 32.39µ 24.12µ ~ 0.100
_BufferPoolAllocation/512KB-4 99.89µ 89.23µ ~ 0.100
_BufferPoolConcurrent/32KB-4 16.82µ 16.33µ ~ 0.400
_BufferPoolConcurrent/64KB-4 25.64µ 25.80µ ~ 0.700
_BufferPoolConcurrent/512KB-4 137.5µ 133.5µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/16KB-4 587.9µ 590.1µ ~ 0.400
_SubtreeDeserializationWithBufferSizes/32KB-4 586.7µ 588.6µ ~ 0.200
_SubtreeDeserializationWithBufferSizes/64KB-4 587.6µ 587.1µ ~ 0.400
_SubtreeDeserializationWithBufferSizes/128KB-4 589.7µ 588.7µ ~ 0.700
_SubtreeDeserializationWithBufferSizes/512KB-4 605.9µ 602.2µ ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/16KB-4 34.97m 35.75m ~ 0.400
_SubtreeDataDeserializationWithBufferSizes/32KB-4 35.04m 35.29m ~ 0.200
_SubtreeDataDeserializationWithBufferSizes/64KB-4 35.24m 34.95m ~ 0.700
_SubtreeDataDeserializationWithBufferSizes/128KB-4 35.02m 35.01m ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/512KB-4 34.95m 34.78m ~ 0.200
_PooledVsNonPooled/Pooled-4 735.0n 733.9n ~ 0.100
_PooledVsNonPooled/NonPooled-4 6.240µ 6.237µ ~ 0.400
_MemoryFootprint/Current_512KB_32concurrent-4 6.257µ 6.324µ ~ 0.300
_MemoryFootprint/Proposed_32KB_32concurrent-4 8.791µ 8.934µ ~ 0.100
_MemoryFootprint/Alternative_64KB_32concurrent-4 8.484µ 8.495µ ~ 0.700
SubtreeSizes/10k_tx_4_per_subtree-4 1.315m 1.325m ~ 0.700
SubtreeSizes/10k_tx_16_per_subtree-4 312.6µ 316.5µ ~ 0.400
SubtreeSizes/10k_tx_64_per_subtree-4 74.12µ 74.83µ ~ 0.700
SubtreeSizes/10k_tx_256_per_subtree-4 18.43µ 18.65µ ~ 0.400
SubtreeSizes/10k_tx_512_per_subtree-4 9.279µ 9.257µ ~ 0.700
SubtreeSizes/10k_tx_1024_per_subtree-4 4.600µ 4.615µ ~ 0.700
SubtreeSizes/10k_tx_2k_per_subtree-4 2.294µ 2.290µ ~ 0.700
BlockSizeScaling/10k_tx_64_per_subtree-4 73.01µ 72.30µ ~ 0.700
BlockSizeScaling/10k_tx_256_per_subtree-4 18.49µ 18.40µ ~ 1.000
BlockSizeScaling/10k_tx_1024_per_subtree-4 4.605µ 4.595µ ~ 1.000
BlockSizeScaling/50k_tx_64_per_subtree-4 383.5µ 382.2µ ~ 1.000
BlockSizeScaling/50k_tx_256_per_subtree-4 92.69µ 92.92µ ~ 0.400
BlockSizeScaling/50k_tx_1024_per_subtree-4 23.07µ 22.92µ ~ 0.100
SubtreeAllocations/small_subtrees_exists_check-4 153.6µ 154.7µ ~ 1.000
SubtreeAllocations/small_subtrees_data_fetch-4 161.8µ 162.3µ ~ 0.700
SubtreeAllocations/small_subtrees_full_validation-4 320.0µ 322.6µ ~ 0.200
SubtreeAllocations/medium_subtrees_exists_check-4 9.022µ 9.056µ ~ 0.400
SubtreeAllocations/medium_subtrees_data_fetch-4 9.730µ 9.565µ ~ 0.200
SubtreeAllocations/medium_subtrees_full_validation-4 18.75µ 18.55µ ~ 0.100
SubtreeAllocations/large_subtrees_exists_check-4 2.181µ 2.184µ ~ 1.000
SubtreeAllocations/large_subtrees_data_fetch-4 2.350µ 2.336µ ~ 0.300
SubtreeAllocations/large_subtrees_full_validation-4 4.661µ 4.706µ ~ 1.000
_prepareTxsPerLevel-4 391.3m 389.6m ~ 1.000
_prepareTxsPerLevelOrdered-4 4.513m 4.201m ~ 0.100
_prepareTxsPerLevel_Comparison/Original-4 394.8m 390.1m ~ 0.400
_prepareTxsPerLevel_Comparison/Optimized-4 4.266m 4.245m ~ 1.000
StoreBlock_Sequential/BelowCSVHeight-4 338.5µ 333.8µ ~ 0.700
StoreBlock_Sequential/AboveCSVHeight-4 334.9µ 326.5µ ~ 0.400
GetUtxoHashes-4 275.9n 272.5n ~ 0.400
GetUtxoHashes_ManyOutputs-4 46.40µ 46.29µ ~ 1.000
_NewMetaDataFromBytes-4 236.3n 230.7n ~ 0.700
_Bytes-4 619.7n 619.3n ~ 1.000
_MetaBytes-4 565.3n 566.8n ~ 0.700

Threshold: >10% with p < 0.05 | Generated: 2026-05-15 12:37 UTC

@oskarszoon oskarszoon left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified each claim against the code. All hold.

  1. 422 for ErrTxMissingParent — defensible. Raised at Validator.go:851 (script verify) and :1165 (UTXO extension); both "parent not yet seen", retryable. Distinct from 400 (malformed) / 404 (not found) / 409 (conflict).

  2. 200 for ErrTxExists — correct. Validator.go:708 consumes ErrTxExists internally for the main validation path and returns (metadata, nil). The saveAsConflicting branch (Validator.go:636) converts it to ErrTxConflicting → 409. The only way ErrTxExists reaches httpStatusForTxError is the creation-in-progress / already-committed path at aerospike/create.go:1078, both idempotent. The follow-up commit cca9cfcc9 also fixes the response body from "Failed to process transaction" to "OK" — that mismatch was a bug introduced by the first commit and corrected immediately.

  3. errors.Is chain walk — confirmed. Error.Is compares by code then recursively unwraps via Unwrap(). The wrapped-error case at http_handlers_test.go:300–308 exercises a real NewProcessingError-wrapped NewTxMissingParentError, not a bare sentinel.

  4. Default 500 set — spot-checked ErrUtxoHashMismatch (aerospike/spend.go:859, sql/sql.go:2080,2562,3253): data-integrity violation, not client-actionable. ErrTxNotFound stays 500 correctly — it can fire in DAH-evicted-parent paths where it's an unexpected internal state, distinct from ErrTxMissingParent.

  5. No regression — only additive switch cases. No existing mapping changed or reordered.

  6. Local: go test -race -count=1 ./services/propagation/ → 100 passed. go vet ./services/propagation/ → clean.

Minor / non-blocking

The 2xx body fix at Server.go:614–618 checks status >= 200 && status < 300 and returns "OK" for any 2xx. Correct for the current mapping, but couples the response body to the numeric range rather than to the specific error. If a future error is mapped to a different 2xx with different intended body semantics, this silently returns "OK". Worth keying off the error itself in a future change.

@icellan icellan merged commit 4110a49 into bsv-blockchain:main May 15, 2026
25 checks passed
galt-tr added a commit to bsv-blockchain/arcade that referenced this pull request May 20, 2026
* Add dependency-aware dispatch plan

Plan for tracking parent-child relationships between in-flight
transactions so children are not broadcast until their parents have
reached a terminal state. Also covers the pipeline simplifications
that fall out of the change: validation moves into intake, the
tx_validator service is removed, the propagation topic becomes
single-partition with a single-goroutine dispatcher, and the
PENDING_RETRY status / reaper are replaced with an in-memory retry
queue backed by Kafka replay for durability.

Intended for discussion before implementation. Single coordinated
deploy with a new propagation topic and a drain-the-old-queue
prerequisite; no backward-compat code path retained.

* Add dispatcher core: dep index, retry queue, offset tracker

Introduces the single-goroutine dispatcher described in the plan. The
dispatcher reads transactions from an incoming channel, decides whether
to admit each to a pending broadcast batch or hold it as a waiter on
in-flight parents, and emits flushed batches to a downstream worker
channel. Status flips coming back (from broadcast workers and the
merkle-service callback path) drive waiter release, recursive
cascade-reject, retry queueing, and offset terminalization.

Components:

  - dispatcher.go: types (dispatcherMsg, inFlightEntry, statusFlip),
    the Dispatcher struct with its dep-index/retry-queue state, the
    main select loop, and handlers for incoming messages, status
    flips, and timer-driven batch flushes / retry wake-ups.

  - offset_tracker.go: a min-heap with lazy deletion behind the
    offsetTracker interface. Add records an in-flight offset, Done
    marks one terminalized, LowestUnfinished gives the commit point.

  - dispatcher_test.go: unit tests covering the baseline admit path,
    parent-in-flight hold, cascade-reject for REJECTED parents, the
    multi-parent case, retryable failure requeue with backoff, and
    retry exhaustion. Plus an OffsetTracker test for the lazy-delete
    semantics.

Not wired up to anything yet — the dispatcher stands alone. The next
commit will connect it to a Kafka consumer feeding incomingMsgs and a
broadcast-worker pool consuming outgoingBatch. Existing propagator,
tx_validator, and reaper code paths are untouched.

Retry classification uses HTTP status codes (422 for ErrTxMissingParent,
0 for no-response) rather than the existing IsRetryableError text match
— the latter is removed in a follow-up commit alongside the wiring
change. The dependency on Teranode returning 422 is satisfied by
bsv-blockchain/teranode#870.

Verified:
  - go test -count=1 -race ./services/propagation/... -- pass
  - go vet ./services/propagation/... -- clean
  - golangci-lint run ./services/propagation/ -- 0 issues

* Add dispatcher consumer with deferred Kafka offset commit

Builds the dedicated Kafka consumer for the dependency-aware
propagation topic. Distinct from the existing kafka.ConsumerGroup
wrapper (still used by webhook, watchdog, etc.) because it defers
Claim.MarkMessage until the dispatcher reports the message's offset
terminalized via its offsetTracker. Without that, a crash would leave
in-flight txs lost: Kafka would treat their offsets as committed while
the dispatcher's in-memory state was gone.

Architecture is two cooperating goroutines off Run:

  - Read loop: pulls messages from a Claim, decodes the JSON envelope
    (now including input_txids), records the message in a pending map
    keyed by offset, and pushes a dispatcherMsg onto the dispatcher's
    incomingMsgs channel. The send is blocking, so backpressure
    propagates naturally up through the claim buffer into the broker.

  - Commit loop: ticks every 200ms (configurable), queries
    offsetTracker.LowestUnfinished, and calls MarkMessage on every
    pending message with offset below that cursor. Drops committed
    entries from the pending map.

To make this safe, offsetTracker now guards its state with a mutex.
The dispatcher's writes (Add/Done) and the consumer's reads
(LowestUnfinished/Empty) cross goroutine boundaries; without the
mutex, the race detector flagged it correctly. Uncontested mutex
overhead is ~10-20ns per op — negligible against the dispatcher's
~5-10μs per-message JSON decode.

The consumer is not yet wired into the propagator. The next commit
will replace the existing parallel-consumer pool with this consumer
plus the dispatcher.

Verified:
  - go test -count=1 -race ./services/propagation/... -- pass
  - go vet ./services/propagation/... -- clean
  - golangci-lint run ./services/propagation/ -- 0 issues

* Add dispatcher broadcaster: batch HTTP submit + per-tx fallback

Adds the broadcast side of the dep-aware pipeline. Consumes batches
from the dispatcher's outgoingBatch channel, submits them to Teranode,
writes terminal status rows to the store, and emits per-tx statusFlips
back to the dispatcher.

Design:

  - Multi-tx batches go to /txs first. On HTTP 2xx every tx in the
    batch gets ACCEPTED_BY_NETWORK — Teranode handles intra-batch
    dependency ordering internally so an HTTP 200 implies every tx was
    accepted into the mempool. /txs returns aggregate-only status, so
    on HTTP non-2xx the broadcaster falls back to per-tx /tx calls to
    recover per-tx status codes (some txs in a failed batch may
    succeed individually).

  - Single-tx batches go directly to /tx, skipping the batch path.

  - Fan-out across healthy endpoints is parallel; first 2xx wins and
    cancels siblings. No healthy endpoints maps to statusCode=0,
    which the dispatcher classifies as retryable.

  - Store writes use BatchUpdateStatus per tx. Write failures are
    logged but don't block the statusFlip emission — the dispatcher
    advances its in-memory state regardless, and Kafka replay
    reconciles divergence on restart.

  - Sibling cancellation does NOT count against endpoint health — a
    canceled-by-broadcast error is filtered before aggregating
    outcomes.

Tests cover the four meaningful paths: batch accepted (no fallback),
batch rejected with per-tx fallback, single-tx direct, and no healthy
endpoints (retryable signal).

Verified:
  - go test -count=1 -race ./services/propagation/... -- pass
  - go vet ./services/propagation/... -- clean
  - golangci-lint run ./services/propagation/ -- 0 issues

* Move tx validation + dedup + dispatch publish into intake handler

Restructures the submit handler for the dep-aware pipeline. The intake
HTTP handler now performs the work the tx_validator service used to do
asynchronously: parse, run policy validation, dedup via store CAS, and
publish to the dispatch topic with input_txids extracted from the
parsed transaction.

Changes:

  - kafka/topics.go: add TopicDispatch and TopicDispatchDLQ for the
    dep-aware pipeline's fresh single-partition topic.

  - services/api_server/server.go: add a validator.Validator field
    threaded through New(); existing tests using struct-literal
    construction get a nil-safe degraded mode (skips validation).

  - services/api_server/handlers.go: handleSubmitTransaction now runs
    ValidateTransaction (policy-only, matching tx_validator's current
    behavior), persists REJECTED rows synchronously on validation
    failure, performs the dedup CAS via GetOrInsertStatus, and
    publishes to TopicDispatch with an envelope including input_txids.

  - app/app.go: pass d.Validator to api_server.New so production
    deployments get full validation behavior.

  - handlers_test.go: extend mockStore with an overrideable
    getOrInsertStatusFn so tests can exercise the dedup path. Default
    behavior returns inserted=true so existing tests continue to see
    fresh-submission semantics.

  - intake_dispatch_test.go: new test file covering the four cases:
    happy-path publish to TopicDispatch with correct envelope shape,
    dedup hit returns idempotent 202 with existing state, transient
    store error continues to publish, and the pathological
    not-inserted-with-nil-existing case (defensive branch).

The legacy TopicTransaction publish is gone — nothing in production
will be producing to that topic anymore. The tx_validator service
remains as code but its consumer will see no traffic; removed in a
follow-up commit alongside the propagator entry-point swap.

Verified:
  - go test -count=1 -race ./services/api_server/... ./services/propagation/... -- pass
  - go vet ./... -- clean
  - golangci-lint run on touched packages -- 0 issues

* Add Pipeline service wiring dispatcher trio end-to-end

Builds the propagation.Pipeline that wires Dispatcher,
dispatcherConsumer, and dispatcherBroadcaster together with their
shared channels and lifecycle. Implements services.Service so
app.BuildServices can drop it in alongside (or eventually replacing)
the legacy Propagator.

Architecture:

  - dispatcherConsumer reads from kafka.TopicDispatch, defers Kafka
    offset commit until the dispatcher's offsetTracker reports each
    tx terminalized.

  - Dispatcher runs in its own goroutine, owning the in-flight map,
    waiter index, retry queue, and offset heap. No locks because
    only this goroutine mutates them.

  - dispatcherBroadcaster reads emitted batches, submits to Teranode
    via the existing teranode.Client (batch endpoint with per-tx
    fallback on all-rejected), writes terminal status rows to the
    store, and emits per-tx statusFlips back to the dispatcher's
    return channel.

The dispatcher's rejectedSink is wired so cascade rejections from
REJECTED parents also produce a terminal store row — implemented as
a synthetic statusFlip onto the same flips channel the broadcaster
uses, keeping all terminal-write logic in one place
(dispatcherBroadcaster.emitOutcome). The synthetic flip falls back to
a non-blocking send so a full flips channel doesn't stall the
dispatcher loop; correctness is preserved by Kafka replay since the
parent's offset is not committed until the child terminalizes.

Lifecycle:

  - NewPipeline constructs all components and channels but does NOT
    start any goroutine.
  - Start spawns the broadcaster and dispatcher goroutines, then
    runs the consumer in the caller's goroutine (blocking until
    ctx is canceled or the subscription ends).
  - Stop cancels the internal context, which propagates to the two
    background goroutines, and waits for them to exit.

End-to-end tests cover:
  - Happy-path batch: two unrelated txs flow through, both end up
    ACCEPTED_BY_NETWORK in the store, one /txs hit on Teranode.
  - Dep-aware ordering: parent and child published in order, child
    held until parent's broadcast completes (verified by an
    artificially-slow parent and the order of /tx submissions
    observed by the test Teranode emulator).

Not yet wired into app.BuildServices — that swap is the next commit,
landing alongside removal of the legacy propagator entry point.
Merkle-service registration (the F-024 invariant for prior code) is
not yet plumbed in; needed before the cutover but separated from this
commit to keep the diff focused on the dispatcher wiring.

Verified:
  - go test -count=1 -race ./services/propagation/... ./services/api_server/... -- pass
  - go vet ./... -- clean
  - golangci-lint run on touched packages -- 0 issues

* Wire merkle-service registration into dispatcher broadcaster

Before broadcasting a batch to Teranode, the broadcaster now
registers every tx with merkle-service for /watch tracking. Per-tx
registration failures emit a retryable statusFlip (statusCode=0)
so the dispatcher's retry queue re-dispatches them on backoff. Txs
that register successfully proceed to the existing /txs (with /tx
fallback) path.

This preserves the F-024 invariant from the legacy propagator: no
transaction reaches Teranode without a matching /watch entry, so
MINED / SEEN_ON_NETWORK / STUMP callbacks always have a row to
update.

Behavior matches the legacy registerBatch in two important ways:
  - When merkleClient is nil OR callbackURL is empty, registration
    is skipped entirely and every tx in the batch proceeds to
    broadcast. Same fallback the legacy path used.
  - Per-tx independence: a failing tx in a batch does NOT block the
    rest of the batch from broadcasting. The successful subset
    proceeds; failures take the retry path.

The dispatcher's retry queue handles the failed registrations the
same way it handles transient broadcast failures — same
isRetryableStatusCode logic, same exponential backoff via
computeRetryDeadline.

Pipeline wires the merkle deps through cfg.CallbackURL,
cfg.CallbackToken, and cfg.Propagation.MerkleConcurrency so
production behavior matches the legacy propagator.

Test coverage: a batch containing one tx whose merkle registration
fails (HTTP 500 from merkle-service) and one that succeeds. The
failing tx produces a retryable statusFlip and never reaches
Teranode; the successful tx broadcasts normally.

Verified:
  - go test -count=1 -race ./services/propagation/... -- pass
  - golangci-lint run ./services/propagation/ -- 0 issues

* Wire Pipeline into BuildServices, replacing legacy Propagator

app.BuildServices now constructs propagation.NewPipeline for the
"propagation" service mode. The legacy propagation.New(...) call is
removed from the live wiring; the type and its surrounding code stay
in the source tree for a follow-up cleanup commit.

The Pipeline takes a tighter dependency set than the old Propagator:
no leaser (no reaper lease needed — single consumer per partition
makes the cluster-wide lock obsolete) and no publisher (status fan-out
flows through the existing Kafka publisher inside the store/event
layer, not through the propagator).

After this commit the dispatcher pipeline is the production
propagation path. Submissions flow:

  POST /tx → api-server intake (parse + validate + dedup CAS)
           → kafka.TopicDispatch (new single-partition topic)
           → propagation.Pipeline
              ├── dispatcherConsumer (deferred commit)
              ├── Dispatcher (dep index, retry queue)
              └── dispatcherBroadcaster (merkle register + Teranode submit)

The legacy tx_validator service still runs (consuming TopicTransaction)
because nothing has been removed yet — but no producer feeds it, so
it's idle. That removal lands next.

Verified:
  - go test -count=1 -race ./services/propagation/... ./services/api_server/... ./app/... -- pass
  - go vet ./... -- clean
  - golangci-lint run on touched packages -- 0 issues

* Remove tx_validator service

The tx_validator service is dead code after the intake handler took
over its responsibilities (parse + policy validate + dedup CAS) and
intake started publishing directly to TopicDispatch instead of
TopicTransaction. Nothing produces to TopicTransaction anymore, so
the service would just sit idle.

Removed:
  - services/tx_validator/ directory (validator.go + validator_test.go)
  - app/app.go import and "tx-validator" service registration
  - config/config.go "tx-validator" mode entry

The TxValidator config block (cfg.TxValidator) is still defined in
config.go for backward read compatibility — operators with a
tx_validator: section in their config.yaml see it silently ignored
rather than getting a validation error. Removing the struct entirely
can happen in a separate cleanup once the deploy has rolled out.

Metrics with arcade_tx_validator_* names also stay defined for now;
they'll never be incremented in the new pipeline but keeping the
definitions prevents Prometheus scrape errors during the cutover.

Verified:
  - go test -count=1 -race ./services/... ./app/... ./config/... -- pass
  - go vet ./... -- clean

* Consolidate types: reuse propagationMsg, drop statusFlip

Two type renames that should have been extensions in the first place:

  - The dispatcher's incoming-message type is the existing
    propagationMsg, extended with InputTXIDs and KafkaOffset fields.
    Previously this was a parallel "dispatcherMsg" type — same JSON
    shape, same fields — which created unnecessary divergence.

  - Status-flip events between broadcaster/dispatcher use
    *models.TransactionStatus directly. The existing type already
    carries TxID, Status, StatusCode, and ExtraInfo, which is exactly
    what the per-tx outcome carries. The internal "statusFlip" struct
    was just a four-field subset of the same shape.

The broadcaster now constructs one *models.TransactionStatus per
outcome and uses it both as the store write payload AND as the
in-flight status flip sent to the dispatcher. Removes a redundant
local struct.

Reverts the gratuitous "msg" → "envelope" variable rename in
handleSubmitTransaction; the existing msg variable just gets one new
key in its map literal.

Net diff is smaller and matches the existing type vocabulary across
the codebase.

Verified:
  - go test -count=1 -race ./services/... ./app/... -- pass
  - golangci-lint run on touched packages -- 0 issues

* Remove legacy Propagator and its supporting code

The dispatcher pipeline is now the production path. Everything in the
legacy propagation service is dead code:

  - propagator.go: the parallel-consumer Propagator type and its
    flushBatch / processBatch / registerBatch / broadcast helpers.
    Replaced by Dispatcher + dispatcherConsumer +
    dispatcherBroadcaster.

  - reaper.go: the lease-coordinated PENDING_RETRY reaper.
    Replaced by the dispatcher's in-memory retry queue (Kafka replay
    provides crash durability).

  - retryable.go: IsRetryableError text-match classification.
    Replaced by isRetryableStatusCode using the HTTP status codes
    Teranode now returns (per the audit PR landed upstream).

  - backoff.go: ComputeBackoff helper for the reaper. The dispatcher
    has its own inlined retry-deadline math (computeRetryDeadline).

  - replay.go: the one-shot merkle-service replay on startup. Useful
    as a recovery mechanism for merkle-service state loss but not
    needed for the initial cutover; a follow-up can add it back if
    operationally warranted.

  - propagator_test.go: 2k lines of legacy-path tests, including the
    comprehensive mockStore with bookkeeping for PendingRetry,
    retryCount, merkleMarks, etc.

  - health_test.go: tests of the legacy propagator's broadcast
    behavior (first-success-wins across endpoints). Equivalent
    coverage now lives in dispatcher_broadcaster_test.go for the new
    broadcaster.

The propagationMsg type moved from propagator.go to dispatcher.go
where it's the natural neighbor of the dispatcher that consumes it.

A minimal mockStore lives in mocks_test.go — just enough to satisfy
the store.Store interface via embedding. Tests that need specific
store behavior compose over it (broadcastTestStore, pipelineTestStore).

Net: 4276 lines removed, ~25 lines added.

Verified:
  - go test -count=1 -race ./... -- all packages pass
  - golangci-lint run ./services/propagation/ -- 0 issues

* Revert Pipeline rename: keep propagation.Propagator and propagation.New

The plan called for renaming the propagation topic (TopicDispatch) and
changing how it gets consumed (single-partition dispatcher pipeline)
— it did NOT call for renaming the service's external API. Reverts
the gratuitous Pipeline / NewPipeline names back to the original
Propagator / New.

Concretely:
  - type Pipeline → type Propagator
  - func NewPipeline → func New
  - pipeline.go → propagator.go (the file lives next to its peers
    again under the original name)
  - pipeline_test.go → propagator_test.go
  - Test names TestPipeline_* → TestPropagator_*
  - app.BuildServices reference propagation.New (no signature change
    visible to callers)

Internal types stay as-is: Dispatcher, dispatcherConsumer,
dispatcherBroadcaster — those are genuinely new components introduced
by the dep-aware pipeline; renaming them would just rename internal
implementation details without changing what they are.

Verified:
  - go test -count=1 -race ./services/propagation/... ./app/... -- pass
  - golangci-lint run ./services/propagation/ ./app/ -- 0 issues

* Revert "Revert Pipeline rename: keep propagation.Propagator and propagation.New"

This reverts commit 1db4a07bc536f9b1892210bf31e2dddb11e11f35.

* Revert "Remove legacy Propagator and its supporting code"

This reverts commit b10b45aa815e93c695d4c81548f2e4ac454f40df.

* Revert "Consolidate types: reuse propagationMsg, drop statusFlip"

This reverts commit 319b4dcfcadc8d15f9e99df6323e4ceee59a8a2f.

* Revert "Remove tx_validator service"

This reverts commit 26604caadf9d6b6e1326dd4320d082e1e3ceaf93.

* Revert "Wire Pipeline into BuildServices, replacing legacy Propagator"

This reverts commit 95a5dd6bfcd5cd14b3528d180945aebe03526867.

* Revert "Wire merkle-service registration into dispatcher broadcaster"

This reverts commit ab4e72a162eccf8b5630d4c6d664a764bc9b2182.

* Revert "Add Pipeline service wiring dispatcher trio end-to-end"

This reverts commit e0b181a6b766a397824e4af30c7967e8e9bbfa8b.

* Revert "Move tx validation + dedup + dispatch publish into intake handler"

This reverts commit 92c429aef2ac25b323d124173377d8a56126fb28.

* Revert "Add dispatcher broadcaster: batch HTTP submit + per-tx fallback"

This reverts commit 0f8d9e7d6c8bf0545d52edae83f1a087d3ccd140.

* Revert "Add dispatcher consumer with deferred Kafka offset commit"

This reverts commit a5958cee4b795d26aff62be8274c8507302ed356.

* Revert "Add dispatcher core: dep index, retry queue, offset tracker"

This reverts commit c34de7f630d0ccf487c69bd9f42bf26c6b4c9797.

* Dep-aware propagation: intake validation + propagator waiter index

Implements the dep-aware dispatch described in docs/plans/. Targeted
modifications to the existing Propagator and intake handler; no
architectural rework, no type churn.

Changes:

  - propagationMsg gains an InputTXIDs field. Optional on the JSON
    side so older producers continue to interoperate; the propagator
    treats absent/empty as "no in-flight parents".

  - Intake handler (api_server) now performs the work the
    tx_validator service used to do asynchronously: parse, run policy
    validation via validator.ValidateTransaction (skipFees=true,
    skipScripts=true — matches the legacy tx_validator behavior),
    dedup CAS via store.GetOrInsertStatus, extract InputTXIDs, and
    publish to kafka.TopicPropagation directly. Validation failure
    persists a terminal REJECTED row and returns 400 synchronously.

  - api_server.New now takes a *validator.Validator; app.BuildServices
    threads d.Validator through. Existing tests using struct-literal
    construction (validator nil) skip validation, preserving their
    behavior.

  - Propagator gains an in-flight dep index: inFlight set, waiters
    map (parent → set of children), pendingParents map (child → set
    of unmet parents), heldMsgs map (held child txid → message).
    handleMessage now checks input txids against inFlight and, if any
    parent is in flight, registers the message as a waiter instead
    of admitting it to pendingMsgs.

  - applyTerminalStatuses hooks into handleTerminalForDeps after the
    bulk publish: ACCEPTED parents release waiters whose pendingParents
    set becomes empty (re-entered into pendingMsgs so the next
    flushBatch picks them up); REJECTED parents cascade-reject every
    descendant recursively, writing terminal REJECTED rows and
    emitting a bulk publish for SSE/webhook subscribers. Terminalized
    txids are dropped from inFlight.

  - tx_validator gains an exported CollectInputTXIDs helper that
    intake reuses, keeping the wire format extraction identical
    regardless of producer. tx_validator itself is no longer
    registered in app.BuildServices — intake covers its job — but
    the package stays in the source tree as preserved queue-processing
    scaffolding (its message format includes input_txids too in case
    anyone re-activates it).

  - No new topic constant, no new types, no architectural split. The
    existing TopicPropagation carries the extended message format.
    Single-partition deployment is an operator choice, not a code
    constraint.

Verified:
  - go test -count=1 -race ./services/api_server/... ./services/propagation/...
    ./services/tx_validator/... ./app/... -- all pass
  - go vet ./... -- clean
  - golangci-lint run -- 0 issues

* Tests for dep-aware admission + waiter release/cascade

depaware_test.go covers the five core behaviors:

  - HoldsChildWhenParentInFlight: a tx whose InputTXIDs include a tx
    currently in flight lands in heldMsgs + pendingParents + waiters,
    not pendingMsgs.
  - ReleasesWaitersOnAccepted: parent's terminal ACCEPTED clears its
    waiters' pendingParents entry; a child whose set is now empty is
    re-entered into pendingMsgs.
  - CascadesRejectedChildren: parent's terminal REJECTED recursively
    rejects every descendant (child, grandchild). Each cascaded row
    is written to the store via BatchUpdateStatusReturning.
  - NoParents_AdmitsNormally: tx with no InputTXIDs follows the
    legacy admission path, no waiter state.
  - ParentNotInFlight_AdmitsChildDirectly: only IN-FLIGHT parents
    create waits — children of long-mined or never-seen parents go
    straight to pendingMsgs.

Behavior fix uncovered by the cascade test: a held waiter must still
appear in inFlight so DEEPER descendants (grandchild waiting on a
held child) register their waiter edge correctly. Handle this by
adding propMsg.TXID to inFlight BEFORE the hold/admit decision in
handleMessage. Removed the post-release inFlight insert in
handleTerminalForDeps since the child was already there.

persistCascadeRejections now uses BatchUpdateStatusReturning to match
applyTerminalStatuses's existing store contract (mockStore implements
that method, not BatchUpdateStatus).

Verified:
  - go test -count=1 -race ./services/propagation/... -- all pass
  - golangci-lint run ./services/propagation/ -- 0 issues

* Move dep-index to a dispatcher goroutine, drop the mutex

The previous version put inFlight / waiters / pendingParents /
heldMsgs on the Propagator struct behind the existing p.mu mutex.
That was the wrong shape: with a single Kafka consumer per partition
and the dispatch decisions all logically owned by one ordering
domain, the dep-index should live in a single goroutine with no
locks. The mutex was reading concurrent traffic from handleMessage
(consumer goroutine) and applyTerminalStatuses (processBatch
goroutines, up to MaxConcurrentBatches in parallel) — workable, but
extra synchronization complexity for state that has a natural single
owner.

This commit:

  - Creates services/propagation/dispatcher.go containing the
    dispatcher goroutine (runDispatcher) plus the admitRequest /
    terminalEvent / terminalResult protocol types. All dep-index
    state (inFlight, waiters, pendingParents, heldMsgs) is declared
    inside runDispatcher's function body — nothing else can reach
    them. The dispatcher serializes admit and terminal events via a
    select loop; no locks anywhere on dep-index state.

  - Removes inFlight / waiters / pendingParents / heldMsgs from the
    Propagator struct. Removes the holdAsWaiterLocked, releaseWaitersLocked,
    cascadeRejectLocked, handleTerminalForDeps helpers — their logic
    moved into the dispatcher goroutine.

  - handleMessage now calls p.admitToDispatcher(propMsg), which sends
    a request and waits for a synchronous admit/hold reply. Single
    goroutine round-trip per message; channels are buffered so a
    momentarily slow dispatcher doesn't immediately stall the consumer.

  - applyTerminalStatuses now calls p.notifyTerminalToDispatcher per
    terminalized txid, gets back the list of released waiters
    (re-entered into pendingMsgs by the caller) and cascaded
    descendants (caller writes REJECTED rows + bulk publish).

  - The dispatcher goroutine is started in New (not Start) so the
    existing tests that construct via New and call handleMessage
    directly continue to work. Stop cancels the dispatcher.

  - depaware_test.go updated to test via observable behavior
    (pendingMsgs contents, ms.updates) instead of poking at private
    dep-index maps, since those are no longer accessible from outside
    the dispatcher goroutine.

The legacy p.mu still protects pendingMsgs (a slice the consumer
goroutine and the dispatcher both need to append to — released
waiters re-enter via the consumer-side helper). That mutex was
already there before this PR and isn't part of the dep-aware change.

Verified:
  - go test -count=1 -race -timeout 60s ./services/... ./app/... -- all pass
  - golangci-lint run on touched packages -- 0 issues

* Dispatcher owns pendingMsgs end-to-end: no shared state, no mutex

The dispatcher now owns ALL dep-aware state — inFlight, waiters,
pendingParents, heldMsgs, AND pendingMsgs — exclusively within its
goroutine. No shared maps, no shared slice, no mutex anywhere in the
propagation pipeline's dep-aware path.

Specific fixes addressed (per the in-tree audit):

  - Item 16: pendingMsgs moved from Propagator (mutex-guarded) into
    the dispatcher's local state. handleMessage admits via the
    dispatcher's admitCh; the dispatcher decides hold-or-admit AND
    appends to its own pendingMsgs. flushBatch drains the dispatcher
    via a new drainCh request/reply. Released waiters from terminal
    events get appended to pendingMsgs by the dispatcher directly —
    the caller never touches the slice. p.mu and p.pendingMsgs are
    deleted from the Propagator struct entirely.

  - Item 17 (real bug): the previous order was admitToDispatcher
    first (adds to inFlight), THEN maxPending check (returns error
    if full). A rejected tx would leak into inFlight with no terminal
    event to clean it up. The check is now atomic inside the
    dispatcher: hold-as-waiter doesn't count toward maxPending,
    admit-to-broadcast checks the cap BEFORE adding to inFlight, and
    a full-cap admit returns rejected without touching state.

  - Item 12: cascade rejection reason is now threaded through.
    applyTerminalStatuses passes each rejected tx's ExtraInfo as the
    reason on terminalEvent. The dispatcher carries the reason
    through cascadeReject and returns it on each cascadedRejection.
    persistCascadeRejections uses the carried reason instead of the
    hardcoded "parent rejected" (with that string as the fallback
    when no reason was provided).

  - Item 14: removed the dead `var _ = zap.NewNop` stub.

Default maxPending bumped from 50,000 to 1,000,000 per discussion —
at ~500 bytes per propagationMsg that's ~500 MB for the slice
itself, comfortably within the 8-16 GB memory envelope we target.
config.go comment updated.

Test coverage:
  - TestHandleMessage_MaxPendingFull_ReturnsErrorWithoutLeakingInFlight
    is new — pins the item-17 bug fix (a rejected admit can be
    retried after the queue drains, which wouldn't work if the tx
    leaked into in-flight state).
  - TestApplyTerminalStatuses_CascadesRejectedChildren now also
    asserts the rejection reason is threaded from parent to child to
    grandchild ("bad parent" not "parent rejected").

Verified:
  - go test -count=1 -race -timeout 60s ./services/... ./app/...
    — all pass
  - go vet ./... — clean
  - golangci-lint run — 0 issues

* Cascade reason is always "parent rejected", drop reason threading

A cascaded child's rejection is structural — the child itself didn't
fail for any reason of its own, only because an ancestor did. The
parent's actual cause (invalid script, missing parent, whatever)
lives on the parent's row; downstream consumers can correlate via
the dep graph if they care.

Reverts the reason-threading from the previous commit:

  - terminalEvent loses its `reason` field.
  - cascadedRejection struct removed; cascade returns []string again.
  - terminalResult.cascaded is []string.
  - persistCascadeRejections always writes "parent rejected" as
    ExtraInfo regardless of the parent's cause.
  - rejectedReasons map removed from applyTerminalStatuses.
  - notifyTerminalToDispatcher's reason parameter removed.

Test for cascade rejection updated: assert ExtraInfo is "parent
rejected" for both child and grandchild, regardless of the parent's
"bad parent" reason.

Verified:
  - go test -count=1 -race -timeout 60s ./services/propagation/... -- pass
  - golangci-lint run ./services/propagation/ -- 0 issues

* Backpressure via dispatcher select, not DLQ

When pendingMsgs is at maxPending, the dispatcher excludes admitCh
from its select loop via the nil-channel trick. handleMessage's send
to admitCh blocks. The Kafka consumer goroutine sits inside
handleMessage. Kafka's offset doesn't advance. The broker holds the
messages on its side (disk-backed for real Kafka; in-memory buffer
for the memory broker, which has its own producer-side backpressure
to the client).

Replaces the previous wrong behavior where a full pending queue
returned an error from handleMessage, which the Kafka wrapper
interpreted as a malformed message and routed to DLQ after exhausting
retries. That path lost messages — a full in-memory queue is just
"temporarily out of room," not a malformed payload.

Changes:

  - admitResult drops the `full` field. The only outcomes are
    admitted (added to pendingMsgs) and held (registered as waiter).
  - Dispatcher's select uses `admitChIfRoom` — set to p.admitCh
    when len(pendingMsgs) < maxPending, nil otherwise. nil channel
    is never ready in select.
  - handleAdmit drops the maxPending check (the select gates it).
  - handleMessage no longer returns an error on full; it just blocks
    on the admit reply, which blocks until the dispatcher has room.

Test renamed to TestHandleMessage_MaxPendingFull_BlocksUntilDrained.
Verifies handleMessage blocks while pending is at cap, then unblocks
and completes after a drain frees capacity. Previous error-return
assertion is gone.

Verified:
  - go test -count=1 -race -timeout 60s ./services/... ./app/... -- all pass
  - golangci-lint run ./services/propagation/ -- 0 issues

* Same-batch admission + cascade-release; drop pendingParents map

Implements the dep-aware optimization the plan called for and we
overlooked: a parent and its child can broadcast in the same /txs
batch since Teranode handles intra-batch ordering. The dispatcher
only holds children whose parents are in a DIFFERENT in-flight
batch.

State model changes:

  - Drop the pendingParents map entirely. Whether a held child is
    ready to release is computed at release time by walking the
    child's heldMsgs[child].InputTXIDs against inFlight and inPending
    — recomputes the same answer a counter would have given, but one
    less map to maintain.
  - Add inPending — the subset of inFlight currently in pendingMsgs
    (queued for the next batch). Used during admit ("parent in same
    batch → fine") and release ("parent just entered inPending →
    cascade-release my held children").

Admit logic (handleAdmit):
  - For each input txid, classify:
    - in inPending → fine, parent will be in same batch
    - in inFlight but not inPending → blocker (parent broadcasting
      separately, or itself held)
    - not in inFlight → fine, out of scope
  - If any blocker → hold the child on each blocking parent.
  - Otherwise → admit to inFlight + inPending + pendingMsgs.

Drain (drainCh): hand pendingMsgs to the caller, clear inPending
(the txids stay in inFlight — they're broadcasting now, just no
longer eligible for the next batch).

Release (releaseWaiters): worklist walk. For each newly-released
or moved-to-inPending parent, scan its waiters; for each child,
canRelease() rechecks the child's other parents against inFlight/
inPending. If clear, release to pendingMsgs/inPending and add the
child to the worklist — its own waiters may now release too.
Propagates a held chain in one terminal event.

Cascade-reject (cascadeReject): unchanged in intent, just cleans
up multi-parent waiters via the new cleanupWaiterEntries helper
shared with releaseWaiters.

Practical effect: a 1000-deep held chain whose root just
terminalized ACCEPTED now releases the entire chain to the next
batch in one event instead of one round-trip per link.

Test rewrite (depaware_test.go):
  - TestHandleMessage_SameBatchAdmission — new; verifies parent
    and child both end up in the drained batch when admitted
    between flushes.
  - TestHandleMessage_HoldsChildWhenParentInDifferentBatch —
    renamed from HoldsChildWhenParentInFlight, exercises the
    drain-then-admit-child path.
  - TestCascadeRelease_DeepChain — new; verifies grandparent
    ACCEPTED releases both parent and child held below it in a
    single event.
  - Existing release / cascade-reject / no-parent / maxPending
    tests updated to fit the new state semantics.

Verified:
  - go test -count=1 -race -timeout 60s ./services/... ./app/...
    — all pass
  - golangci-lint run ./services/propagation/ — 0 issues

* Revert maxPending default back to 50,000

The 1,000,000 default was based on a confused mental model — I was
sizing for "total txs the system can hold" when maxPending is just
the broadcast-bound buffer (pendingMsgs), drained every flush
(50ms ticker or Kafka idle). Kafka's partition holds the bulk; held
waiters are in a separate map. 50,000 is the legacy default the
original developer picked, and the existing rationale
("multi-minute stall absorption at 50 TPS") is correct.

config.go comment rewritten to accurately describe what the cap
gates (handleMessage's admitCh send blocks; consumer pauses pulls;
backpressure to broker — no DLQ). Held waiters explicitly noted as
NOT counted.

Verified:
  - go test -count=1 -race -timeout 60s ./services/propagation/... -- pass
  - golangci-lint run ./services/propagation/ ./config/ -- 0 issues

* Wire in changes to batch submission

* Add flush-ticker goroutine to break consumer-wedge deadlock

The Kafka consumer wrapper drives both Kafka pulls AND flushBatch on
one goroutine via a single select loop. When handleMessage blocks on
the dispatcher's admitCh (which happens once pendingMsgs hits
maxPending), the consumer goroutine wedges inside processOne; the
wrapper's flush hook can't fire from the same goroutine, so the
dispatcher never drains and the wedge becomes permanent.

This goroutine runs flushBatch on its own clock (50ms ticker) so
drains keep happening regardless of the consumer's state. When the
consumer is wedged on a full pending queue, the ticker drains it,
opens a slot, the dispatcher's admitCh becomes selectable again, and
handleMessage unblocks. Without this, "pause Kafka consumption when
pending is full" would deadlock the propagator instead of
backpressuring the broker.

Verified:
  - go test -count=1 -race -timeout 60s ./services/propagation/... -- pass
  - golangci-lint run ./services/propagation/ -- 0 issues

* Disable the reaper goroutine

In the dep-aware design we don't write PENDING_RETRY rows — failed
broadcasts terminate as REJECTED in the next commit. With no
PENDING_RETRY traffic the reaper has nothing to process. Removing
the goroutine launch from Start so the reaper code stays in the
source tree (preserved tooling) but doesn't run.

The unused-linter reference (`_ = p.runReaper`) keeps the function
in scope without flagging an unused warning; if a future need for
the reaper resurfaces, it's a one-line revert to bring it back.

Verified:
  - go test -count=1 -race -timeout 60s ./services/propagation/... -- pass
  - golangci-lint run ./services/propagation/ -- 0 issues

* Replace handleRetryableFailure call sites with terminal REJECTED

In the dep-aware design we explicitly do not queue PENDING_RETRY rows
— failed broadcasts terminate as REJECTED. This commit changes the
three live call sites in processBatch + registerBatch from "queue
for retry" to "write terminal REJECTED."

Sites:
  - registerBatch: per-tx merkle-service registration failure →
    writeTerminalRejected(reason="merkle-service registration
    failed: <err>").
  - processBatch's no-verdict branch (no peer acknowledged the
    broadcast): writeTerminalRejected(reason="broadcast not
    acknowledged by any endpoint").
  - processBatch's rejected-with-IsRetryableError-match branch:
    falls through to the normal terminal REJECTED counter and
    applyTerminalStatuses store write. The dep-aware dispatcher
    handles the cascade.

Added writeTerminalRejected helper: writes a one-shot terminal
REJECTED row via BatchUpdateStatusReturning, notifies the dispatcher
so its in-flight state cleans up (and any held descendants cascade-
reject), and emits a bulk publish event.

handleRetryableFailure stays in the source tree, still referenced by
the (preserved-but-not-started) reaper. Kafka replay reconciles any
in-flight state if a propagator crashes mid-write.

Test changes:
  - TestHandleMessage_MerkleFailure_PendingRetryRow renamed to
    TestHandleMessage_MerkleFailure_WritesTerminalRejected, asserts
    REJECTED row + merkle-service error in ExtraInfo.
  - TestHandleMessage_MerkleTimeout_NoBroadcast updated similarly.
  - TestProcessBatch_MerkleFailure_AbortsBatch / TestHandleMessage_
    PartialMerkleFailure_OnlyFailedMessageIsAborted updated to
    expect REJECTED writes instead of PENDING_RETRY rows.
  - TestNoVerdict_NoHealthyEndpoints_RoutesToRetry renamed to
    TestNoVerdict_NoHealthyEndpoints_TerminalRejected.
  - TestRetry_MissingInputs_ThenReaperSuccess and TestRetry_
    Exhausted_ClearsToRejected removed — both exercised the reaper
    directly, which is now disabled. Comment in their place points
    at git history if the flow ever needs restoring.
  - Removed unused test helpers (forceReady, newTeranodeServerToggle)
    left over from the deleted retry tests.

Verified:
  - go test -count=1 -race -timeout 60s ./services/... ./app/... -- all pass
  - golangci-lint run ./services/propagation/ -- 0 issues

* Drop same-batch admission and cascade-release

Teranode is updating to process bulk /txs submissions in parallel,
which means parent and child can no longer be in the same /txs POST
— Teranode can't trust intra-batch ordering across parallel workers.
Every child must wait for its parent to terminalize ACCEPTED before
the child enters its own batch.

Dispatcher state model simplifies:

  - Drop inPending. The only set we track is inFlight (anywhere in
    the dispatcher's awareness: pendingMsgs, broadcasting, or held).
  - handleAdmit: any in-flight parent blocks. No more "parent in
    same batch is fine" exception.
  - canRelease: any in-flight parent (other than the just-terminalized
    one) blocks. Recomputes from heldMsgs[child].InputTXIDs each call.
  - releaseWaiters: walks waiters[parent] ONE LEVEL DEEP. Released
    children go into pendingMsgs; their own waiters stay held until
    THEY terminalize. No recursive cascade through the dep chain.

Effect: a deep chain of N held txs releases sequentially, one batch
per link. Slow for deep chains but correct under Teranode's new
parallel processing. Independent fan-out (no parent-child overlap)
is unchanged — all admit to pendingMsgs and batch normally.

cascadeReject is unchanged in shape (rejections still walk the full
subtree).

Test changes:
  - TestHandleMessage_SameBatchAdmission renamed/inverted to
    TestHandleMessage_HoldsChildWhenParentInFlight: child of an
    in-flight parent now stays held even if the parent is in the
    current pending batch.
  - TestCascadeRelease_DeepChain renamed to
    TestSequentialReleaseDeepChain: grandparent ACCEPTED releases
    parent ONLY; child stays held until parent ACCEPTED separately.

api_server bulk-handler comment updated to reflect the new
"hold until parent terminalizes" semantics rather than the old
"co-batch parents and children" claim.

Verified:
  - go test -count=1 -race -timeout 60s ./services/... ./app/... -- all pass
  - golangci-lint run -- 0 issues

* Update dep-aware-dispatch plan to reflect agreed retry model

Captures the decisions reached in design discussion that didn't make
it into the first implementation pass:

- Dispatcher state names match the actual code (heldMsgs, pendingMsgs,
  offsetTracker) instead of the original sketch (pendingParents,
  retryQueue, offsetHeap).
- Offset commit: explicit deferred MarkMessage in kafka/consumer.go
  driven by a 200ms propagator-side ticker that reads the dispatcher's
  LowestUnfinished.
- Retry handling distinguishes the two failure shapes:
    - Merkle /watch: inline retry the whole batch (binary failure).
    - Teranode per-slot infra: requeue individual txs back to
      pendingMsgs after a short flat wait.
- No attempt counter / no exhaustion — infra failures retry forever.
- Backoff for merkle inline retry: 100ms -> 500ms -> 2s -> 5s -> 10s,
  then 10s steady.
- Per-tx /tx fallback removed; /txs response per-slot lines from
  #879 + #881 carry the per-tx classification.
- TX_MISSING_PARENT becomes terminal (dep-aware dispatch already
  gates children, so missing-parent from Teranode means an unknown
  parent — wallets resolve it).
- Added Dependencies section noting #879 and #881.

* Dep-aware dispatch: offset tracker, deferred kafka commit, infra-retry loops

Implements the parts of the dep-aware-dispatch plan that the previous
pass missed:

- offset_tracker.go: min-heap with lazy deletion. Add on admit, Done on
  terminal/cascade, LowestUnfinished read by the watermark ticker.
- dispatcher.go: inFlight now stores offsets (txid → offset). handleAdmit
  registers the offset; handleTerminal and cascadeReject mark Done. New
  handleRequeue re-admits a previously-admitted tx after an infra
  failure without re-tracking its offset. New watermarkCh + requeueCh
  channels keep all state mutations on the dispatcher goroutine.
- kafka/consumer.go: opt-in DeferredCommit mode. processOne stops
  MarkMessage-ing immediately; pending messages accumulate in a per-claim
  map keyed by offset; new SetCommitWatermark(offset) lets the service
  publish a watermark that the next flush boundary marks up to.
- propagator.go runWatermarkTicker: 200ms goroutine that reads the
  dispatcher's LowestUnfinished and pushes (watermark - 1) into the
  consumer. Started in Start.
- registerBatch: capped-exponential inline retry (100ms → 500ms → 2s →
  5s → 10s, then 10s steady, forever) for merkle-service /watch
  failures. No terminal REJECTED for infra failures.
- teranode/client.go SubmitTransactions: parses the post-#881 /txs
  per-slot response body. New per-slot []string return enables the
  caller to classify per-tx instead of treating the batch as binary.
- broadcastBatchToEndpoints: emits []txResult with txResultClass set
  per-tx (Accepted, Rejected with Teranode code, InFlight, or Requeue).
- broadcastChunk: per-tx /tx fallback removed.
- processBatch: classifies per-tx via txResultClass. Requeue slots
  gather, sleep requeueWait (2s), then route back through
  requeueToDispatcher. Accepted/Rejected slots terminalize as before.
- Removed reaper.go, retryable.go, backoff.go, writeTerminalRejected,
  handleRetryableFailure, and the unused retryMaxAttempts /
  retryBackoffMs propagator fields.
- Obsolete tests asserting old "merkle failure → terminal REJECTED"
  and "all endpoints 500 → terminal REJECTED" behavior are removed
  with explanatory comments. New per-slot classification and requeue
  paths are covered by the existing dep-aware tests + the new
  dispatcher offset wiring.

Tests: go test -race ./... — all packages pass.
Vet/lint: clean across changed packages.

* Collapse dispatcher + kafka consumer onto a single goroutine

The previous deferred-commit design split state across the dispatcher
goroutine, the kafka consumer goroutine, and a third watermark-ticker
goroutine that shuffled a shared atomic.Int64 between them. That was
exactly the cross-goroutine state-sharing pattern the dep-aware-dispatch
plan was built to avoid.

This commit consolidates: the dispatcher loop is now the Sarama
ClaimHandler, so the Sarama-managed goroutine IS the dispatcher
goroutine. All dep-aware state lives in local variables on that one
goroutine, including the per-offset *kafka.Message references used to
call claim.MarkMessage when an offset terminalizes.

Changes:

- kafka/consumer.go: remove DeferredCommit, atomic watermark, and
  SetCommitWatermark. Add an opt-in ClaimHandler field that takes the
  raw Claim; when set, the wrapper hands the claim straight to the
  handler and skips the internal drain/flush/retry/DLQ plumbing.
  Other consumers (webhook, watchdog, etc.) keep the existing
  callback-handler path unchanged.
- services/propagation/dispatcher.go: runDispatcher now takes a
  kafka.Claim parameter. The select reads from claim.Messages() when
  non-nil and from admitCh when nil; both feed the same handleAdmit
  logic. On terminal events the loop calls advanceMarks which walks
  pendingMarks (offset → *kafka.Message) and calls claim.MarkMessage
  for every offset strictly below the tracker's LowestUnfinished.
  Backpressure now applies to both message sources via the same
  nil-channel-in-select trick. flushTicker only ticks in production
  (claim != nil) so tests can drive flushes explicitly.
- services/propagation/propagator.go: Start cancels the test-mode
  dispatcher goroutine started in New, then registers
  ClaimHandler: p.handleClaim, which runs the same runDispatcher loop
  on Sarama's per-claim goroutine. runFlushTicker and
  runWatermarkTicker are deleted (the consolidated loop handles
  flush, terminal events, and offset marking inline).

One goroutine for all state. No atomics. No mutexes. No ticker
goroutine pushing values across goroutine boundaries.

Tests: go test -race ./... — all green.

* Wire api_server submission paths into the in-process TxTracker

The bump-builder's filterTrackedTxids relies on TxTracker to know
which level-0 hashes from a compound BUMP correspond to txs arcade
is responsible for. Without a TxTracker entry the filter returns
empty for that txid and markMinedAndPublish never runs — txs that
broadcast successfully and reach SEEN_ON_NETWORK never advance to
MINED. That's the e2e-smoke regression on TestSmoke_RealBlockMined_*.

The original wiring lived in the tx_validator service (Add was
called inside Validator.Validate). The dep-aware-dispatch plan
removes that service and moves validation into the api_server.
The TxTracker.Add call needs to move with it. Add it in both:

- handleSingleTx: after the dedup CAS, before the kafka publish.
- handleSubmitTransactions (bulk): after the dedup CAS, before the
  kafka SendBatch.

Both paths only Add txids that are toPublish — duplicates and
dedup-CAS losers are skipped so we don't churn the tracker for
no-op submissions.

Tests: go test -race ./services/api_server/... ./services/propagation/...
— all pass.

* Replace inline retry/requeue with reaper-driven rebroadcast

Strips out the exponential-backoff machinery the dep-aware code had
been carrying around and replaces it with a single durable retry
surface: a per-tick reaper that walks the status store, picks up rows
stuck non-terminal past per-status thresholds, and rebroadcasts them
through the same registerBatch + broadcastInChunks pipeline as the
hot path.

What's gone:
- merkleRetryBackoffs, merkleRetryBackoff, the registerBatch retry
  loop. /watch failures now just exclude the failed txs from this
  batch — they stay at RECEIVED in the store and the reaper picks
  them up later.
- inlineRetryAttempts / inlineRetryDelay and the broadcastSingleToEndpoints
  retry loop. Single broadcast attempt; reaper handles retries.
- requeueWait, txResultClassRequeue (renamed to txResultClassSkip),
  the dispatcher requeue channel + handleRequeue + requeueToDispatcher,
  and the processBatch sleep-and-resend loop.

What's new:
- services/propagation/reaper.go: a lease-guarded scan that walks
  IterateStatusesSince(now - 24h), picks up RECEIVED rows older than
  30s and SEEN_ON_NETWORK rows older than 1h, and rebroadcasts in
  batches of up to 200. Results flow through applyTerminalStatuses
  so the dispatcher gets terminal-notify callbacks the normal way.
- api_server submission paths now populate RawTx on the inserted
  status row so the reaper has the body to rebroadcast.
- processBatch handling for txResultClassInFlight: notify the
  dispatcher with StatusAcceptedByNetwork so the Kafka offset
  advances and any dep-waiters release, without writing a status
  row — the SEEN_ON_NETWORK / MINED callback owns the row from here.

Tests + lint clean.

* Wait for reaper/merkle-replay goroutines in Stop

Both runReaper and runMerkleReplay were spawned naked from Start, so
when the surrounding app cleanup cancels their ctx and closes the
store immediately after, an in-flight IterateStatusesSince or
leaser.Release can race against a closed store. The pebble backing
panics on calls after Close, the test framework catches the panic
from the orphaned goroutine, and the failure is attributed to the
test's t.Cleanup — which is exactly the TestSmoke_RealBlockMined_
ViaReprocess failure on the latest CI run.

Track both goroutines in a per-propagator WaitGroup and have Stop
wait on it after the dispatcher's context is cancelled. The wait is
bounded by the existing ctx cancellation that triggers each
goroutine's exit path, so it adds no latency in the steady-state
shutdown — it just keeps the cleanup ordering clean.

* Fix 'cancelled' misspelling (lint)

* Remove accidentally-committed merkle-service proposal docs

These were drafted alongside the dep-aware-dispatch work but aren't
part of this PR's scope. Got pulled in by a stray git add -A in an
earlier commit. Removing keeps the PR diff focused on the
propagation + reaper changes.

* Parse /txs per-slot results on HTTP 207

Teranode #879 + #881 moves the per-slot response body from HTTP 500 to
HTTP 207 Multi-Status. SubmitTransactions now parses the per-slot body
on 207 only; 4xx/5xx are treated as pure infra failures with no body
parsing.

Adds unit tests for the 207 per-slot path and for 500 (no per-slot).
Updates docs to drop the speculative pre-#881 500 path that was never
in production Teranode.

* Collapse single-tx broadcast path; route everything through /txs

/txs handles a chunk of one tx identically to a chunk of N — same per-tx
work on the Teranode side, no extra HTTP overhead. The single-tx path
existed only to interpret /tx's HTTP-status-coded responses, which the
per-slot /txs classifier subsumes. Net -397 lines.

Removed:
- broadcastSingleToEndpoints / broadcastSingleOnce / singleResultToTxResult
- broadcastResult struct
- Client.SubmitTransaction
- txResultClassInFlight (only emitted by the /tx 202 path)
- statusPriority + pendingRetryCount helpers (now unused)
- Tests that asserted /tx-vs-/txs routing or /tx 202 in-flight behavior

broadcastChunk now always calls broadcastBatchToEndpoints regardless of
chunk size. The classification pipeline (Accepted / Rejected / Skip) is
the single source of truth for per-tx outcomes.

* Remove tx_validator service

The validator was already disabled (the previous commit left
`_ = tx_validator.New` to keep the import alive). This finishes the
deletion:

- Remove services/tx_validator/{validator,validator_test}.go and
  deploy/tx-validator.yaml.
- Drop TxValidatorConfig from config + the tx_validator block from
  config.example.yaml. Drop "tx-validator" from valid mode strings
  and CLI help. Existing configs with mode: tx-validator now fail-fast
  with an invalid-mode error instead of starting a no-op service.
- Drop the five TxValidator* Prometheus metrics and the test that
  touched them.
- Inline tx_validator.CollectInputTXIDs into api_server as a local
  helper so the intake handler no longer imports the removed package.
- Comment cleanup across propagator/reaper/teranode/store/e2e for
  stale tx_validator references.

Intake validation, dedup, and the input_txids envelope are all
already done synchronously in services/api_server/handlers.go, so
there's no behavior change.

* Inline-requeue for transient broadcast failures; rename Skip class

Rework how the propagation pipeline handles broadcast outcomes that
don't carry a per-tx verdict (no healthy peers, /txs returned 4xx/5xx
with no per-slot body, per-slot infra-bucket code).

- Renamed txResultClassSkip → txResultClassRequeue. The new class name
  describes the action: feed the tx back through dispatcher admission
  via the new requeueCh, after a flat 2s wait. The dispatcher's
  inFlight entry and pinned Kafka offset both persist across the
  requeue, so dep-ordering and at-least-once delivery are preserved.
- registerBatch now returns (registered, failed); failed-watch txs
  are also routed through requeueAfterDelay instead of being silently
  dropped on the floor.
- Reaper drops its RECEIVED-row scan. A RECEIVED row that is not also
  in a Kafka queue can only exist because intake's Kafka publish
  failed and the client got a 5xx — at that point the client owns
  retry, and arcade rebroadcasting on their behalf would override
  their decision. The reaper now only covers SEEN_ON_NETWORK
  stuckness (Teranode mempool eviction, dropped MINED callback).
- submitBroadcastJobs now blocks on a full job channel instead of
  spawning a one-off goroutine. The 256-worker pool is now an actual
  cap on concurrent broadcasts; backpressure flows back to the
  dispatcher under saturation (peak-job math: 4 batches × 4 chunks ×
  N endpoints starts exceeding the 1280-slot cap around 80 endpoints).
- teranode client: stop stripping a "Failed to process transactions:"
  heading line in parseTxsPerSlot. That heading was in the pre-#881
  500-path body, which we no longer parse — under #881 the 207 body
  is just per-slot lines.

* Restore Client.SubmitTransaction on the teranode client

POST /tx is a real Teranode endpoint and SubmitTransaction is the
client surface for it. Removing it from the client (in c1f5c58) along
with arcade's own use of /tx meant callers wanting the single-tx
endpoint would have to reimplement the HTTP boilerplate. Restore the
method and its test so the client stays a faithful Teranode interface
regardless of whether arcade's current pipeline uses it.

The propagation pipeline still routes everything through /txs — that
direction doesn't change.

* Parse Teranode failure-list response by txid

Teranode upstream main #879 emits HTTP 500 with body "Failed to process
transactions:\n<NAME> (<num>): [ProcessTransaction][<txid>] <msg>\n..."
for any /txs batch with at least one failure. Each line corresponds to
one failed tx; successes are absent.

- teranode/client.go: SubmitTransactions now extracts a txid → error-line
  map from the failure body (parseTxsFailures). Returns nil for HTTP 200
  (everything accepted) and for any 4xx/5xx whose body doesn't carry the
  "Failed to process transactions:" header (treated as a pure infra
  failure by the caller).
- services/propagation/propagator.go: broadcastJobResult.perSlot →
  broadcastJobResult.failures (map[string]string keyed by txid).
  broadcastBatchToEndpoints classifies each submitted tx by lookup in
  the failure map — present = REJECTED on a terminal Teranode code or
  REQUEUE on an infra-bucket code; absent = ACCEPTED.
- classifyPerSlotLine → classifyFailureLine (Accepted branch dropped —
  accepted txs aren't in the failure map).
- TxsResultOK constant removed (no per-slot "OK" sentinel under the new
  format).

Replaces the speculative 207 Multi-Status path from the bsv-blockchain/
teranode#881 PR after that PR was rejected. The single Teranode-side
behavior that produces orphaned lines (no txid) is a processing-time
panic, which only affects one tx; arcade's reaper picks up the
unmatched stuck row and rebroadcasts on its next tick.

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Address feedback

* parseTxsFailures: fail-closed on orphan lines

Any non-empty failure line without an extractable txid now returns
nil from parseTxsFailures, dropping to the caller's whole-batch
requeue path. The only Teranode path that produces such a line is the
processOne panic recover (Server.go:740-744), which doesn't include
tx.TxID() in the wrapper. Re-broadcasting the whole batch is safe
(Teranode dedups on store-level) and avoids silently marking the
orphan's owner as ACCEPTED.

* Respect datahub discovery flag after startup

* Address feedback

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Address feedback

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Fix CheckExactPartitions missing-topic test to match contract

The test name and assertion described the soft-fail behavior of
CheckPartitions, but CheckExactPartitions hard-fails on missing topics
by design — the call site in app/app.go relies on it to abort startup,
because Kafka auto-creating TopicPropagation on first publish would use
the broker default partition count and silently break the dispatcher's
single-partition invariant.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Dylan <64976002+galt-tr@users.noreply.github.com>
Co-authored-by: Dylan <dylan@britevue.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants