fix(propagation): correct HTTP status mapping for missing-parent and duplicate-tx#870
Conversation
…duplicate-tx httpStatusForTxError previously returned 500 for both ErrTxMissingParent and ErrTxExists, causing downstream clients to retry validation-time conditions as if they were infrastructure failures. - ErrTxMissingParent -> 422 Unprocessable Entity (tx valid but parent not yet available; client can resubmit after parent lands) - ErrTxExists -> 200 OK (UTXO store reports the tx is already accepted; duplicate submission of a known-good tx is idempotent success, not a conflict) Adds handler tests covering both new mappings plus the wrapped chain case for ErrTxMissingParent.
ErrTxExists maps to 200 OK but the response body was still using the "Failed to process transaction" prefix. Use "OK" body for 2xx returns.
Introduces the single-goroutine dispatcher described in the plan. The
dispatcher reads transactions from an incoming channel, decides whether
to admit each to a pending broadcast batch or hold it as a waiter on
in-flight parents, and emits flushed batches to a downstream worker
channel. Status flips coming back (from broadcast workers and the
merkle-service callback path) drive waiter release, recursive
cascade-reject, retry queueing, and offset terminalization.
Components:
- dispatcher.go: types (dispatcherMsg, inFlightEntry, statusFlip),
the Dispatcher struct with its dep-index/retry-queue state, the
main select loop, and handlers for incoming messages, status
flips, and timer-driven batch flushes / retry wake-ups.
- offset_tracker.go: a min-heap with lazy deletion behind the
offsetTracker interface. Add records an in-flight offset, Done
marks one terminalized, LowestUnfinished gives the commit point.
- dispatcher_test.go: unit tests covering the baseline admit path,
parent-in-flight hold, cascade-reject for REJECTED parents, the
multi-parent case, retryable failure requeue with backoff, and
retry exhaustion. Plus an OffsetTracker test for the lazy-delete
semantics.
Not wired up to anything yet — the dispatcher stands alone. The next
commit will connect it to a Kafka consumer feeding incomingMsgs and a
broadcast-worker pool consuming outgoingBatch. Existing propagator,
tx_validator, and reaper code paths are untouched.
Retry classification uses HTTP status codes (422 for ErrTxMissingParent,
0 for no-response) rather than the existing IsRetryableError text match
— the latter is removed in a follow-up commit alongside the wiring
change. The dependency on Teranode returning 422 is satisfied by
bsv-blockchain/teranode#870.
Verified:
- go test -count=1 -race ./services/propagation/... -- pass
- go vet ./services/propagation/... -- clean
- golangci-lint run ./services/propagation/ -- 0 issues
|
🤖 Claude Code Review Status: Complete Review Summary: Code Quality: No issues found. The implementation correctly uses Verification:
No documentation updates needed: The HTTP endpoints are internal/alternative submission paths. User-facing docs focus on gRPC API, which is appropriate. |
|
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-05-15 12:37 UTC |
oskarszoon
left a comment
There was a problem hiding this comment.
Verified each claim against the code. All hold.
-
422 for
ErrTxMissingParent— defensible. Raised atValidator.go:851(script verify) and:1165(UTXO extension); both "parent not yet seen", retryable. Distinct from 400 (malformed) / 404 (not found) / 409 (conflict). -
200 for
ErrTxExists— correct.Validator.go:708consumesErrTxExistsinternally for the main validation path and returns(metadata, nil). ThesaveAsConflictingbranch (Validator.go:636) converts it toErrTxConflicting→ 409. The only wayErrTxExistsreacheshttpStatusForTxErroris the creation-in-progress / already-committed path ataerospike/create.go:1078, both idempotent. The follow-up commitcca9cfcc9also fixes the response body from"Failed to process transaction"to"OK"— that mismatch was a bug introduced by the first commit and corrected immediately. -
errors.Ischain walk — confirmed.Error.Iscompares by code then recursively unwraps viaUnwrap(). The wrapped-error case athttp_handlers_test.go:300–308exercises a realNewProcessingError-wrappedNewTxMissingParentError, not a bare sentinel. -
Default 500 set — spot-checked
ErrUtxoHashMismatch(aerospike/spend.go:859,sql/sql.go:2080,2562,3253): data-integrity violation, not client-actionable.ErrTxNotFoundstays 500 correctly — it can fire in DAH-evicted-parent paths where it's an unexpected internal state, distinct fromErrTxMissingParent. -
No regression — only additive switch cases. No existing mapping changed or reordered.
-
Local:
go test -race -count=1 ./services/propagation/→ 100 passed.go vet ./services/propagation/→ clean.
Minor / non-blocking
The 2xx body fix at Server.go:614–618 checks status >= 200 && status < 300 and returns "OK" for any 2xx. Correct for the current mapping, but couples the response body to the numeric range rather than to the specific error. If a future error is mapped to a different 2xx with different intended body semantics, this silently returns "OK". Worth keying off the error itself in a future change.
* Add dependency-aware dispatch plan
Plan for tracking parent-child relationships between in-flight
transactions so children are not broadcast until their parents have
reached a terminal state. Also covers the pipeline simplifications
that fall out of the change: validation moves into intake, the
tx_validator service is removed, the propagation topic becomes
single-partition with a single-goroutine dispatcher, and the
PENDING_RETRY status / reaper are replaced with an in-memory retry
queue backed by Kafka replay for durability.
Intended for discussion before implementation. Single coordinated
deploy with a new propagation topic and a drain-the-old-queue
prerequisite; no backward-compat code path retained.
* Add dispatcher core: dep index, retry queue, offset tracker
Introduces the single-goroutine dispatcher described in the plan. The
dispatcher reads transactions from an incoming channel, decides whether
to admit each to a pending broadcast batch or hold it as a waiter on
in-flight parents, and emits flushed batches to a downstream worker
channel. Status flips coming back (from broadcast workers and the
merkle-service callback path) drive waiter release, recursive
cascade-reject, retry queueing, and offset terminalization.
Components:
- dispatcher.go: types (dispatcherMsg, inFlightEntry, statusFlip),
the Dispatcher struct with its dep-index/retry-queue state, the
main select loop, and handlers for incoming messages, status
flips, and timer-driven batch flushes / retry wake-ups.
- offset_tracker.go: a min-heap with lazy deletion behind the
offsetTracker interface. Add records an in-flight offset, Done
marks one terminalized, LowestUnfinished gives the commit point.
- dispatcher_test.go: unit tests covering the baseline admit path,
parent-in-flight hold, cascade-reject for REJECTED parents, the
multi-parent case, retryable failure requeue with backoff, and
retry exhaustion. Plus an OffsetTracker test for the lazy-delete
semantics.
Not wired up to anything yet — the dispatcher stands alone. The next
commit will connect it to a Kafka consumer feeding incomingMsgs and a
broadcast-worker pool consuming outgoingBatch. Existing propagator,
tx_validator, and reaper code paths are untouched.
Retry classification uses HTTP status codes (422 for ErrTxMissingParent,
0 for no-response) rather than the existing IsRetryableError text match
— the latter is removed in a follow-up commit alongside the wiring
change. The dependency on Teranode returning 422 is satisfied by
bsv-blockchain/teranode#870.
Verified:
- go test -count=1 -race ./services/propagation/... -- pass
- go vet ./services/propagation/... -- clean
- golangci-lint run ./services/propagation/ -- 0 issues
* Add dispatcher consumer with deferred Kafka offset commit
Builds the dedicated Kafka consumer for the dependency-aware
propagation topic. Distinct from the existing kafka.ConsumerGroup
wrapper (still used by webhook, watchdog, etc.) because it defers
Claim.MarkMessage until the dispatcher reports the message's offset
terminalized via its offsetTracker. Without that, a crash would leave
in-flight txs lost: Kafka would treat their offsets as committed while
the dispatcher's in-memory state was gone.
Architecture is two cooperating goroutines off Run:
- Read loop: pulls messages from a Claim, decodes the JSON envelope
(now including input_txids), records the message in a pending map
keyed by offset, and pushes a dispatcherMsg onto the dispatcher's
incomingMsgs channel. The send is blocking, so backpressure
propagates naturally up through the claim buffer into the broker.
- Commit loop: ticks every 200ms (configurable), queries
offsetTracker.LowestUnfinished, and calls MarkMessage on every
pending message with offset below that cursor. Drops committed
entries from the pending map.
To make this safe, offsetTracker now guards its state with a mutex.
The dispatcher's writes (Add/Done) and the consumer's reads
(LowestUnfinished/Empty) cross goroutine boundaries; without the
mutex, the race detector flagged it correctly. Uncontested mutex
overhead is ~10-20ns per op — negligible against the dispatcher's
~5-10μs per-message JSON decode.
The consumer is not yet wired into the propagator. The next commit
will replace the existing parallel-consumer pool with this consumer
plus the dispatcher.
Verified:
- go test -count=1 -race ./services/propagation/... -- pass
- go vet ./services/propagation/... -- clean
- golangci-lint run ./services/propagation/ -- 0 issues
* Add dispatcher broadcaster: batch HTTP submit + per-tx fallback
Adds the broadcast side of the dep-aware pipeline. Consumes batches
from the dispatcher's outgoingBatch channel, submits them to Teranode,
writes terminal status rows to the store, and emits per-tx statusFlips
back to the dispatcher.
Design:
- Multi-tx batches go to /txs first. On HTTP 2xx every tx in the
batch gets ACCEPTED_BY_NETWORK — Teranode handles intra-batch
dependency ordering internally so an HTTP 200 implies every tx was
accepted into the mempool. /txs returns aggregate-only status, so
on HTTP non-2xx the broadcaster falls back to per-tx /tx calls to
recover per-tx status codes (some txs in a failed batch may
succeed individually).
- Single-tx batches go directly to /tx, skipping the batch path.
- Fan-out across healthy endpoints is parallel; first 2xx wins and
cancels siblings. No healthy endpoints maps to statusCode=0,
which the dispatcher classifies as retryable.
- Store writes use BatchUpdateStatus per tx. Write failures are
logged but don't block the statusFlip emission — the dispatcher
advances its in-memory state regardless, and Kafka replay
reconciles divergence on restart.
- Sibling cancellation does NOT count against endpoint health — a
canceled-by-broadcast error is filtered before aggregating
outcomes.
Tests cover the four meaningful paths: batch accepted (no fallback),
batch rejected with per-tx fallback, single-tx direct, and no healthy
endpoints (retryable signal).
Verified:
- go test -count=1 -race ./services/propagation/... -- pass
- go vet ./services/propagation/... -- clean
- golangci-lint run ./services/propagation/ -- 0 issues
* Move tx validation + dedup + dispatch publish into intake handler
Restructures the submit handler for the dep-aware pipeline. The intake
HTTP handler now performs the work the tx_validator service used to do
asynchronously: parse, run policy validation, dedup via store CAS, and
publish to the dispatch topic with input_txids extracted from the
parsed transaction.
Changes:
- kafka/topics.go: add TopicDispatch and TopicDispatchDLQ for the
dep-aware pipeline's fresh single-partition topic.
- services/api_server/server.go: add a validator.Validator field
threaded through New(); existing tests using struct-literal
construction get a nil-safe degraded mode (skips validation).
- services/api_server/handlers.go: handleSubmitTransaction now runs
ValidateTransaction (policy-only, matching tx_validator's current
behavior), persists REJECTED rows synchronously on validation
failure, performs the dedup CAS via GetOrInsertStatus, and
publishes to TopicDispatch with an envelope including input_txids.
- app/app.go: pass d.Validator to api_server.New so production
deployments get full validation behavior.
- handlers_test.go: extend mockStore with an overrideable
getOrInsertStatusFn so tests can exercise the dedup path. Default
behavior returns inserted=true so existing tests continue to see
fresh-submission semantics.
- intake_dispatch_test.go: new test file covering the four cases:
happy-path publish to TopicDispatch with correct envelope shape,
dedup hit returns idempotent 202 with existing state, transient
store error continues to publish, and the pathological
not-inserted-with-nil-existing case (defensive branch).
The legacy TopicTransaction publish is gone — nothing in production
will be producing to that topic anymore. The tx_validator service
remains as code but its consumer will see no traffic; removed in a
follow-up commit alongside the propagator entry-point swap.
Verified:
- go test -count=1 -race ./services/api_server/... ./services/propagation/... -- pass
- go vet ./... -- clean
- golangci-lint run on touched packages -- 0 issues
* Add Pipeline service wiring dispatcher trio end-to-end
Builds the propagation.Pipeline that wires Dispatcher,
dispatcherConsumer, and dispatcherBroadcaster together with their
shared channels and lifecycle. Implements services.Service so
app.BuildServices can drop it in alongside (or eventually replacing)
the legacy Propagator.
Architecture:
- dispatcherConsumer reads from kafka.TopicDispatch, defers Kafka
offset commit until the dispatcher's offsetTracker reports each
tx terminalized.
- Dispatcher runs in its own goroutine, owning the in-flight map,
waiter index, retry queue, and offset heap. No locks because
only this goroutine mutates them.
- dispatcherBroadcaster reads emitted batches, submits to Teranode
via the existing teranode.Client (batch endpoint with per-tx
fallback on all-rejected), writes terminal status rows to the
store, and emits per-tx statusFlips back to the dispatcher's
return channel.
The dispatcher's rejectedSink is wired so cascade rejections from
REJECTED parents also produce a terminal store row — implemented as
a synthetic statusFlip onto the same flips channel the broadcaster
uses, keeping all terminal-write logic in one place
(dispatcherBroadcaster.emitOutcome). The synthetic flip falls back to
a non-blocking send so a full flips channel doesn't stall the
dispatcher loop; correctness is preserved by Kafka replay since the
parent's offset is not committed until the child terminalizes.
Lifecycle:
- NewPipeline constructs all components and channels but does NOT
start any goroutine.
- Start spawns the broadcaster and dispatcher goroutines, then
runs the consumer in the caller's goroutine (blocking until
ctx is canceled or the subscription ends).
- Stop cancels the internal context, which propagates to the two
background goroutines, and waits for them to exit.
End-to-end tests cover:
- Happy-path batch: two unrelated txs flow through, both end up
ACCEPTED_BY_NETWORK in the store, one /txs hit on Teranode.
- Dep-aware ordering: parent and child published in order, child
held until parent's broadcast completes (verified by an
artificially-slow parent and the order of /tx submissions
observed by the test Teranode emulator).
Not yet wired into app.BuildServices — that swap is the next commit,
landing alongside removal of the legacy propagator entry point.
Merkle-service registration (the F-024 invariant for prior code) is
not yet plumbed in; needed before the cutover but separated from this
commit to keep the diff focused on the dispatcher wiring.
Verified:
- go test -count=1 -race ./services/propagation/... ./services/api_server/... -- pass
- go vet ./... -- clean
- golangci-lint run on touched packages -- 0 issues
* Wire merkle-service registration into dispatcher broadcaster
Before broadcasting a batch to Teranode, the broadcaster now
registers every tx with merkle-service for /watch tracking. Per-tx
registration failures emit a retryable statusFlip (statusCode=0)
so the dispatcher's retry queue re-dispatches them on backoff. Txs
that register successfully proceed to the existing /txs (with /tx
fallback) path.
This preserves the F-024 invariant from the legacy propagator: no
transaction reaches Teranode without a matching /watch entry, so
MINED / SEEN_ON_NETWORK / STUMP callbacks always have a row to
update.
Behavior matches the legacy registerBatch in two important ways:
- When merkleClient is nil OR callbackURL is empty, registration
is skipped entirely and every tx in the batch proceeds to
broadcast. Same fallback the legacy path used.
- Per-tx independence: a failing tx in a batch does NOT block the
rest of the batch from broadcasting. The successful subset
proceeds; failures take the retry path.
The dispatcher's retry queue handles the failed registrations the
same way it handles transient broadcast failures — same
isRetryableStatusCode logic, same exponential backoff via
computeRetryDeadline.
Pipeline wires the merkle deps through cfg.CallbackURL,
cfg.CallbackToken, and cfg.Propagation.MerkleConcurrency so
production behavior matches the legacy propagator.
Test coverage: a batch containing one tx whose merkle registration
fails (HTTP 500 from merkle-service) and one that succeeds. The
failing tx produces a retryable statusFlip and never reaches
Teranode; the successful tx broadcasts normally.
Verified:
- go test -count=1 -race ./services/propagation/... -- pass
- golangci-lint run ./services/propagation/ -- 0 issues
* Wire Pipeline into BuildServices, replacing legacy Propagator
app.BuildServices now constructs propagation.NewPipeline for the
"propagation" service mode. The legacy propagation.New(...) call is
removed from the live wiring; the type and its surrounding code stay
in the source tree for a follow-up cleanup commit.
The Pipeline takes a tighter dependency set than the old Propagator:
no leaser (no reaper lease needed — single consumer per partition
makes the cluster-wide lock obsolete) and no publisher (status fan-out
flows through the existing Kafka publisher inside the store/event
layer, not through the propagator).
After this commit the dispatcher pipeline is the production
propagation path. Submissions flow:
POST /tx → api-server intake (parse + validate + dedup CAS)
→ kafka.TopicDispatch (new single-partition topic)
→ propagation.Pipeline
├── dispatcherConsumer (deferred commit)
├── Dispatcher (dep index, retry queue)
└── dispatcherBroadcaster (merkle register + Teranode submit)
The legacy tx_validator service still runs (consuming TopicTransaction)
because nothing has been removed yet — but no producer feeds it, so
it's idle. That removal lands next.
Verified:
- go test -count=1 -race ./services/propagation/... ./services/api_server/... ./app/... -- pass
- go vet ./... -- clean
- golangci-lint run on touched packages -- 0 issues
* Remove tx_validator service
The tx_validator service is dead code after the intake handler took
over its responsibilities (parse + policy validate + dedup CAS) and
intake started publishing directly to TopicDispatch instead of
TopicTransaction. Nothing produces to TopicTransaction anymore, so
the service would just sit idle.
Removed:
- services/tx_validator/ directory (validator.go + validator_test.go)
- app/app.go import and "tx-validator" service registration
- config/config.go "tx-validator" mode entry
The TxValidator config block (cfg.TxValidator) is still defined in
config.go for backward read compatibility — operators with a
tx_validator: section in their config.yaml see it silently ignored
rather than getting a validation error. Removing the struct entirely
can happen in a separate cleanup once the deploy has rolled out.
Metrics with arcade_tx_validator_* names also stay defined for now;
they'll never be incremented in the new pipeline but keeping the
definitions prevents Prometheus scrape errors during the cutover.
Verified:
- go test -count=1 -race ./services/... ./app/... ./config/... -- pass
- go vet ./... -- clean
* Consolidate types: reuse propagationMsg, drop statusFlip
Two type renames that should have been extensions in the first place:
- The dispatcher's incoming-message type is the existing
propagationMsg, extended with InputTXIDs and KafkaOffset fields.
Previously this was a parallel "dispatcherMsg" type — same JSON
shape, same fields — which created unnecessary divergence.
- Status-flip events between broadcaster/dispatcher use
*models.TransactionStatus directly. The existing type already
carries TxID, Status, StatusCode, and ExtraInfo, which is exactly
what the per-tx outcome carries. The internal "statusFlip" struct
was just a four-field subset of the same shape.
The broadcaster now constructs one *models.TransactionStatus per
outcome and uses it both as the store write payload AND as the
in-flight status flip sent to the dispatcher. Removes a redundant
local struct.
Reverts the gratuitous "msg" → "envelope" variable rename in
handleSubmitTransaction; the existing msg variable just gets one new
key in its map literal.
Net diff is smaller and matches the existing type vocabulary across
the codebase.
Verified:
- go test -count=1 -race ./services/... ./app/... -- pass
- golangci-lint run on touched packages -- 0 issues
* Remove legacy Propagator and its supporting code
The dispatcher pipeline is now the production path. Everything in the
legacy propagation service is dead code:
- propagator.go: the parallel-consumer Propagator type and its
flushBatch / processBatch / registerBatch / broadcast helpers.
Replaced by Dispatcher + dispatcherConsumer +
dispatcherBroadcaster.
- reaper.go: the lease-coordinated PENDING_RETRY reaper.
Replaced by the dispatcher's in-memory retry queue (Kafka replay
provides crash durability).
- retryable.go: IsRetryableError text-match classification.
Replaced by isRetryableStatusCode using the HTTP status codes
Teranode now returns (per the audit PR landed upstream).
- backoff.go: ComputeBackoff helper for the reaper. The dispatcher
has its own inlined retry-deadline math (computeRetryDeadline).
- replay.go: the one-shot merkle-service replay on startup. Useful
as a recovery mechanism for merkle-service state loss but not
needed for the initial cutover; a follow-up can add it back if
operationally warranted.
- propagator_test.go: 2k lines of legacy-path tests, including the
comprehensive mockStore with bookkeeping for PendingRetry,
retryCount, merkleMarks, etc.
- health_test.go: tests of the legacy propagator's broadcast
behavior (first-success-wins across endpoints). Equivalent
coverage now lives in dispatcher_broadcaster_test.go for the new
broadcaster.
The propagationMsg type moved from propagator.go to dispatcher.go
where it's the natural neighbor of the dispatcher that consumes it.
A minimal mockStore lives in mocks_test.go — just enough to satisfy
the store.Store interface via embedding. Tests that need specific
store behavior compose over it (broadcastTestStore, pipelineTestStore).
Net: 4276 lines removed, ~25 lines added.
Verified:
- go test -count=1 -race ./... -- all packages pass
- golangci-lint run ./services/propagation/ -- 0 issues
* Revert Pipeline rename: keep propagation.Propagator and propagation.New
The plan called for renaming the propagation topic (TopicDispatch) and
changing how it gets consumed (single-partition dispatcher pipeline)
— it did NOT call for renaming the service's external API. Reverts
the gratuitous Pipeline / NewPipeline names back to the original
Propagator / New.
Concretely:
- type Pipeline → type Propagator
- func NewPipeline → func New
- pipeline.go → propagator.go (the file lives next to its peers
again under the original name)
- pipeline_test.go → propagator_test.go
- Test names TestPipeline_* → TestPropagator_*
- app.BuildServices reference propagation.New (no signature change
visible to callers)
Internal types stay as-is: Dispatcher, dispatcherConsumer,
dispatcherBroadcaster — those are genuinely new components introduced
by the dep-aware pipeline; renaming them would just rename internal
implementation details without changing what they are.
Verified:
- go test -count=1 -race ./services/propagation/... ./app/... -- pass
- golangci-lint run ./services/propagation/ ./app/ -- 0 issues
* Revert "Revert Pipeline rename: keep propagation.Propagator and propagation.New"
This reverts commit 1db4a07bc536f9b1892210bf31e2dddb11e11f35.
* Revert "Remove legacy Propagator and its supporting code"
This reverts commit b10b45aa815e93c695d4c81548f2e4ac454f40df.
* Revert "Consolidate types: reuse propagationMsg, drop statusFlip"
This reverts commit 319b4dcfcadc8d15f9e99df6323e4ceee59a8a2f.
* Revert "Remove tx_validator service"
This reverts commit 26604caadf9d6b6e1326dd4320d082e1e3ceaf93.
* Revert "Wire Pipeline into BuildServices, replacing legacy Propagator"
This reverts commit 95a5dd6bfcd5cd14b3528d180945aebe03526867.
* Revert "Wire merkle-service registration into dispatcher broadcaster"
This reverts commit ab4e72a162eccf8b5630d4c6d664a764bc9b2182.
* Revert "Add Pipeline service wiring dispatcher trio end-to-end"
This reverts commit e0b181a6b766a397824e4af30c7967e8e9bbfa8b.
* Revert "Move tx validation + dedup + dispatch publish into intake handler"
This reverts commit 92c429aef2ac25b323d124173377d8a56126fb28.
* Revert "Add dispatcher broadcaster: batch HTTP submit + per-tx fallback"
This reverts commit 0f8d9e7d6c8bf0545d52edae83f1a087d3ccd140.
* Revert "Add dispatcher consumer with deferred Kafka offset commit"
This reverts commit a5958cee4b795d26aff62be8274c8507302ed356.
* Revert "Add dispatcher core: dep index, retry queue, offset tracker"
This reverts commit c34de7f630d0ccf487c69bd9f42bf26c6b4c9797.
* Dep-aware propagation: intake validation + propagator waiter index
Implements the dep-aware dispatch described in docs/plans/. Targeted
modifications to the existing Propagator and intake handler; no
architectural rework, no type churn.
Changes:
- propagationMsg gains an InputTXIDs field. Optional on the JSON
side so older producers continue to interoperate; the propagator
treats absent/empty as "no in-flight parents".
- Intake handler (api_server) now performs the work the
tx_validator service used to do asynchronously: parse, run policy
validation via validator.ValidateTransaction (skipFees=true,
skipScripts=true — matches the legacy tx_validator behavior),
dedup CAS via store.GetOrInsertStatus, extract InputTXIDs, and
publish to kafka.TopicPropagation directly. Validation failure
persists a terminal REJECTED row and returns 400 synchronously.
- api_server.New now takes a *validator.Validator; app.BuildServices
threads d.Validator through. Existing tests using struct-literal
construction (validator nil) skip validation, preserving their
behavior.
- Propagator gains an in-flight dep index: inFlight set, waiters
map (parent → set of children), pendingParents map (child → set
of unmet parents), heldMsgs map (held child txid → message).
handleMessage now checks input txids against inFlight and, if any
parent is in flight, registers the message as a waiter instead
of admitting it to pendingMsgs.
- applyTerminalStatuses hooks into handleTerminalForDeps after the
bulk publish: ACCEPTED parents release waiters whose pendingParents
set becomes empty (re-entered into pendingMsgs so the next
flushBatch picks them up); REJECTED parents cascade-reject every
descendant recursively, writing terminal REJECTED rows and
emitting a bulk publish for SSE/webhook subscribers. Terminalized
txids are dropped from inFlight.
- tx_validator gains an exported CollectInputTXIDs helper that
intake reuses, keeping the wire format extraction identical
regardless of producer. tx_validator itself is no longer
registered in app.BuildServices — intake covers its job — but
the package stays in the source tree as preserved queue-processing
scaffolding (its message format includes input_txids too in case
anyone re-activates it).
- No new topic constant, no new types, no architectural split. The
existing TopicPropagation carries the extended message format.
Single-partition deployment is an operator choice, not a code
constraint.
Verified:
- go test -count=1 -race ./services/api_server/... ./services/propagation/...
./services/tx_validator/... ./app/... -- all pass
- go vet ./... -- clean
- golangci-lint run -- 0 issues
* Tests for dep-aware admission + waiter release/cascade
depaware_test.go covers the five core behaviors:
- HoldsChildWhenParentInFlight: a tx whose InputTXIDs include a tx
currently in flight lands in heldMsgs + pendingParents + waiters,
not pendingMsgs.
- ReleasesWaitersOnAccepted: parent's terminal ACCEPTED clears its
waiters' pendingParents entry; a child whose set is now empty is
re-entered into pendingMsgs.
- CascadesRejectedChildren: parent's terminal REJECTED recursively
rejects every descendant (child, grandchild). Each cascaded row
is written to the store via BatchUpdateStatusReturning.
- NoParents_AdmitsNormally: tx with no InputTXIDs follows the
legacy admission path, no waiter state.
- ParentNotInFlight_AdmitsChildDirectly: only IN-FLIGHT parents
create waits — children of long-mined or never-seen parents go
straight to pendingMsgs.
Behavior fix uncovered by the cascade test: a held waiter must still
appear in inFlight so DEEPER descendants (grandchild waiting on a
held child) register their waiter edge correctly. Handle this by
adding propMsg.TXID to inFlight BEFORE the hold/admit decision in
handleMessage. Removed the post-release inFlight insert in
handleTerminalForDeps since the child was already there.
persistCascadeRejections now uses BatchUpdateStatusReturning to match
applyTerminalStatuses's existing store contract (mockStore implements
that method, not BatchUpdateStatus).
Verified:
- go test -count=1 -race ./services/propagation/... -- all pass
- golangci-lint run ./services/propagation/ -- 0 issues
* Move dep-index to a dispatcher goroutine, drop the mutex
The previous version put inFlight / waiters / pendingParents /
heldMsgs on the Propagator struct behind the existing p.mu mutex.
That was the wrong shape: with a single Kafka consumer per partition
and the dispatch decisions all logically owned by one ordering
domain, the dep-index should live in a single goroutine with no
locks. The mutex was reading concurrent traffic from handleMessage
(consumer goroutine) and applyTerminalStatuses (processBatch
goroutines, up to MaxConcurrentBatches in parallel) — workable, but
extra synchronization complexity for state that has a natural single
owner.
This commit:
- Creates services/propagation/dispatcher.go containing the
dispatcher goroutine (runDispatcher) plus the admitRequest /
terminalEvent / terminalResult protocol types. All dep-index
state (inFlight, waiters, pendingParents, heldMsgs) is declared
inside runDispatcher's function body — nothing else can reach
them. The dispatcher serializes admit and terminal events via a
select loop; no locks anywhere on dep-index state.
- Removes inFlight / waiters / pendingParents / heldMsgs from the
Propagator struct. Removes the holdAsWaiterLocked, releaseWaitersLocked,
cascadeRejectLocked, handleTerminalForDeps helpers — their logic
moved into the dispatcher goroutine.
- handleMessage now calls p.admitToDispatcher(propMsg), which sends
a request and waits for a synchronous admit/hold reply. Single
goroutine round-trip per message; channels are buffered so a
momentarily slow dispatcher doesn't immediately stall the consumer.
- applyTerminalStatuses now calls p.notifyTerminalToDispatcher per
terminalized txid, gets back the list of released waiters
(re-entered into pendingMsgs by the caller) and cascaded
descendants (caller writes REJECTED rows + bulk publish).
- The dispatcher goroutine is started in New (not Start) so the
existing tests that construct via New and call handleMessage
directly continue to work. Stop cancels the dispatcher.
- depaware_test.go updated to test via observable behavior
(pendingMsgs contents, ms.updates) instead of poking at private
dep-index maps, since those are no longer accessible from outside
the dispatcher goroutine.
The legacy p.mu still protects pendingMsgs (a slice the consumer
goroutine and the dispatcher both need to append to — released
waiters re-enter via the consumer-side helper). That mutex was
already there before this PR and isn't part of the dep-aware change.
Verified:
- go test -count=1 -race -timeout 60s ./services/... ./app/... -- all pass
- golangci-lint run on touched packages -- 0 issues
* Dispatcher owns pendingMsgs end-to-end: no shared state, no mutex
The dispatcher now owns ALL dep-aware state — inFlight, waiters,
pendingParents, heldMsgs, AND pendingMsgs — exclusively within its
goroutine. No shared maps, no shared slice, no mutex anywhere in the
propagation pipeline's dep-aware path.
Specific fixes addressed (per the in-tree audit):
- Item 16: pendingMsgs moved from Propagator (mutex-guarded) into
the dispatcher's local state. handleMessage admits via the
dispatcher's admitCh; the dispatcher decides hold-or-admit AND
appends to its own pendingMsgs. flushBatch drains the dispatcher
via a new drainCh request/reply. Released waiters from terminal
events get appended to pendingMsgs by the dispatcher directly —
the caller never touches the slice. p.mu and p.pendingMsgs are
deleted from the Propagator struct entirely.
- Item 17 (real bug): the previous order was admitToDispatcher
first (adds to inFlight), THEN maxPending check (returns error
if full). A rejected tx would leak into inFlight with no terminal
event to clean it up. The check is now atomic inside the
dispatcher: hold-as-waiter doesn't count toward maxPending,
admit-to-broadcast checks the cap BEFORE adding to inFlight, and
a full-cap admit returns rejected without touching state.
- Item 12: cascade rejection reason is now threaded through.
applyTerminalStatuses passes each rejected tx's ExtraInfo as the
reason on terminalEvent. The dispatcher carries the reason
through cascadeReject and returns it on each cascadedRejection.
persistCascadeRejections uses the carried reason instead of the
hardcoded "parent rejected" (with that string as the fallback
when no reason was provided).
- Item 14: removed the dead `var _ = zap.NewNop` stub.
Default maxPending bumped from 50,000 to 1,000,000 per discussion —
at ~500 bytes per propagationMsg that's ~500 MB for the slice
itself, comfortably within the 8-16 GB memory envelope we target.
config.go comment updated.
Test coverage:
- TestHandleMessage_MaxPendingFull_ReturnsErrorWithoutLeakingInFlight
is new — pins the item-17 bug fix (a rejected admit can be
retried after the queue drains, which wouldn't work if the tx
leaked into in-flight state).
- TestApplyTerminalStatuses_CascadesRejectedChildren now also
asserts the rejection reason is threaded from parent to child to
grandchild ("bad parent" not "parent rejected").
Verified:
- go test -count=1 -race -timeout 60s ./services/... ./app/...
— all pass
- go vet ./... — clean
- golangci-lint run — 0 issues
* Cascade reason is always "parent rejected", drop reason threading
A cascaded child's rejection is structural — the child itself didn't
fail for any reason of its own, only because an ancestor did. The
parent's actual cause (invalid script, missing parent, whatever)
lives on the parent's row; downstream consumers can correlate via
the dep graph if they care.
Reverts the reason-threading from the previous commit:
- terminalEvent loses its `reason` field.
- cascadedRejection struct removed; cascade returns []string again.
- terminalResult.cascaded is []string.
- persistCascadeRejections always writes "parent rejected" as
ExtraInfo regardless of the parent's cause.
- rejectedReasons map removed from applyTerminalStatuses.
- notifyTerminalToDispatcher's reason parameter removed.
Test for cascade rejection updated: assert ExtraInfo is "parent
rejected" for both child and grandchild, regardless of the parent's
"bad parent" reason.
Verified:
- go test -count=1 -race -timeout 60s ./services/propagation/... -- pass
- golangci-lint run ./services/propagation/ -- 0 issues
* Backpressure via dispatcher select, not DLQ
When pendingMsgs is at maxPending, the dispatcher excludes admitCh
from its select loop via the nil-channel trick. handleMessage's send
to admitCh blocks. The Kafka consumer goroutine sits inside
handleMessage. Kafka's offset doesn't advance. The broker holds the
messages on its side (disk-backed for real Kafka; in-memory buffer
for the memory broker, which has its own producer-side backpressure
to the client).
Replaces the previous wrong behavior where a full pending queue
returned an error from handleMessage, which the Kafka wrapper
interpreted as a malformed message and routed to DLQ after exhausting
retries. That path lost messages — a full in-memory queue is just
"temporarily out of room," not a malformed payload.
Changes:
- admitResult drops the `full` field. The only outcomes are
admitted (added to pendingMsgs) and held (registered as waiter).
- Dispatcher's select uses `admitChIfRoom` — set to p.admitCh
when len(pendingMsgs) < maxPending, nil otherwise. nil channel
is never ready in select.
- handleAdmit drops the maxPending check (the select gates it).
- handleMessage no longer returns an error on full; it just blocks
on the admit reply, which blocks until the dispatcher has room.
Test renamed to TestHandleMessage_MaxPendingFull_BlocksUntilDrained.
Verifies handleMessage blocks while pending is at cap, then unblocks
and completes after a drain frees capacity. Previous error-return
assertion is gone.
Verified:
- go test -count=1 -race -timeout 60s ./services/... ./app/... -- all pass
- golangci-lint run ./services/propagation/ -- 0 issues
* Same-batch admission + cascade-release; drop pendingParents map
Implements the dep-aware optimization the plan called for and we
overlooked: a parent and its child can broadcast in the same /txs
batch since Teranode handles intra-batch ordering. The dispatcher
only holds children whose parents are in a DIFFERENT in-flight
batch.
State model changes:
- Drop the pendingParents map entirely. Whether a held child is
ready to release is computed at release time by walking the
child's heldMsgs[child].InputTXIDs against inFlight and inPending
— recomputes the same answer a counter would have given, but one
less map to maintain.
- Add inPending — the subset of inFlight currently in pendingMsgs
(queued for the next batch). Used during admit ("parent in same
batch → fine") and release ("parent just entered inPending →
cascade-release my held children").
Admit logic (handleAdmit):
- For each input txid, classify:
- in inPending → fine, parent will be in same batch
- in inFlight but not inPending → blocker (parent broadcasting
separately, or itself held)
- not in inFlight → fine, out of scope
- If any blocker → hold the child on each blocking parent.
- Otherwise → admit to inFlight + inPending + pendingMsgs.
Drain (drainCh): hand pendingMsgs to the caller, clear inPending
(the txids stay in inFlight — they're broadcasting now, just no
longer eligible for the next batch).
Release (releaseWaiters): worklist walk. For each newly-released
or moved-to-inPending parent, scan its waiters; for each child,
canRelease() rechecks the child's other parents against inFlight/
inPending. If clear, release to pendingMsgs/inPending and add the
child to the worklist — its own waiters may now release too.
Propagates a held chain in one terminal event.
Cascade-reject (cascadeReject): unchanged in intent, just cleans
up multi-parent waiters via the new cleanupWaiterEntries helper
shared with releaseWaiters.
Practical effect: a 1000-deep held chain whose root just
terminalized ACCEPTED now releases the entire chain to the next
batch in one event instead of one round-trip per link.
Test rewrite (depaware_test.go):
- TestHandleMessage_SameBatchAdmission — new; verifies parent
and child both end up in the drained batch when admitted
between flushes.
- TestHandleMessage_HoldsChildWhenParentInDifferentBatch —
renamed from HoldsChildWhenParentInFlight, exercises the
drain-then-admit-child path.
- TestCascadeRelease_DeepChain — new; verifies grandparent
ACCEPTED releases both parent and child held below it in a
single event.
- Existing release / cascade-reject / no-parent / maxPending
tests updated to fit the new state semantics.
Verified:
- go test -count=1 -race -timeout 60s ./services/... ./app/...
— all pass
- golangci-lint run ./services/propagation/ — 0 issues
* Revert maxPending default back to 50,000
The 1,000,000 default was based on a confused mental model — I was
sizing for "total txs the system can hold" when maxPending is just
the broadcast-bound buffer (pendingMsgs), drained every flush
(50ms ticker or Kafka idle). Kafka's partition holds the bulk; held
waiters are in a separate map. 50,000 is the legacy default the
original developer picked, and the existing rationale
("multi-minute stall absorption at 50 TPS") is correct.
config.go comment rewritten to accurately describe what the cap
gates (handleMessage's admitCh send blocks; consumer pauses pulls;
backpressure to broker — no DLQ). Held waiters explicitly noted as
NOT counted.
Verified:
- go test -count=1 -race -timeout 60s ./services/propagation/... -- pass
- golangci-lint run ./services/propagation/ ./config/ -- 0 issues
* Wire in changes to batch submission
* Add flush-ticker goroutine to break consumer-wedge deadlock
The Kafka consumer wrapper drives both Kafka pulls AND flushBatch on
one goroutine via a single select loop. When handleMessage blocks on
the dispatcher's admitCh (which happens once pendingMsgs hits
maxPending), the consumer goroutine wedges inside processOne; the
wrapper's flush hook can't fire from the same goroutine, so the
dispatcher never drains and the wedge becomes permanent.
This goroutine runs flushBatch on its own clock (50ms ticker) so
drains keep happening regardless of the consumer's state. When the
consumer is wedged on a full pending queue, the ticker drains it,
opens a slot, the dispatcher's admitCh becomes selectable again, and
handleMessage unblocks. Without this, "pause Kafka consumption when
pending is full" would deadlock the propagator instead of
backpressuring the broker.
Verified:
- go test -count=1 -race -timeout 60s ./services/propagation/... -- pass
- golangci-lint run ./services/propagation/ -- 0 issues
* Disable the reaper goroutine
In the dep-aware design we don't write PENDING_RETRY rows — failed
broadcasts terminate as REJECTED in the next commit. With no
PENDING_RETRY traffic the reaper has nothing to process. Removing
the goroutine launch from Start so the reaper code stays in the
source tree (preserved tooling) but doesn't run.
The unused-linter reference (`_ = p.runReaper`) keeps the function
in scope without flagging an unused warning; if a future need for
the reaper resurfaces, it's a one-line revert to bring it back.
Verified:
- go test -count=1 -race -timeout 60s ./services/propagation/... -- pass
- golangci-lint run ./services/propagation/ -- 0 issues
* Replace handleRetryableFailure call sites with terminal REJECTED
In the dep-aware design we explicitly do not queue PENDING_RETRY rows
— failed broadcasts terminate as REJECTED. This commit changes the
three live call sites in processBatch + registerBatch from "queue
for retry" to "write terminal REJECTED."
Sites:
- registerBatch: per-tx merkle-service registration failure →
writeTerminalRejected(reason="merkle-service registration
failed: <err>").
- processBatch's no-verdict branch (no peer acknowledged the
broadcast): writeTerminalRejected(reason="broadcast not
acknowledged by any endpoint").
- processBatch's rejected-with-IsRetryableError-match branch:
falls through to the normal terminal REJECTED counter and
applyTerminalStatuses store write. The dep-aware dispatcher
handles the cascade.
Added writeTerminalRejected helper: writes a one-shot terminal
REJECTED row via BatchUpdateStatusReturning, notifies the dispatcher
so its in-flight state cleans up (and any held descendants cascade-
reject), and emits a bulk publish event.
handleRetryableFailure stays in the source tree, still referenced by
the (preserved-but-not-started) reaper. Kafka replay reconciles any
in-flight state if a propagator crashes mid-write.
Test changes:
- TestHandleMessage_MerkleFailure_PendingRetryRow renamed to
TestHandleMessage_MerkleFailure_WritesTerminalRejected, asserts
REJECTED row + merkle-service error in ExtraInfo.
- TestHandleMessage_MerkleTimeout_NoBroadcast updated similarly.
- TestProcessBatch_MerkleFailure_AbortsBatch / TestHandleMessage_
PartialMerkleFailure_OnlyFailedMessageIsAborted updated to
expect REJECTED writes instead of PENDING_RETRY rows.
- TestNoVerdict_NoHealthyEndpoints_RoutesToRetry renamed to
TestNoVerdict_NoHealthyEndpoints_TerminalRejected.
- TestRetry_MissingInputs_ThenReaperSuccess and TestRetry_
Exhausted_ClearsToRejected removed — both exercised the reaper
directly, which is now disabled. Comment in their place points
at git history if the flow ever needs restoring.
- Removed unused test helpers (forceReady, newTeranodeServerToggle)
left over from the deleted retry tests.
Verified:
- go test -count=1 -race -timeout 60s ./services/... ./app/... -- all pass
- golangci-lint run ./services/propagation/ -- 0 issues
* Drop same-batch admission and cascade-release
Teranode is updating to process bulk /txs submissions in parallel,
which means parent and child can no longer be in the same /txs POST
— Teranode can't trust intra-batch ordering across parallel workers.
Every child must wait for its parent to terminalize ACCEPTED before
the child enters its own batch.
Dispatcher state model simplifies:
- Drop inPending. The only set we track is inFlight (anywhere in
the dispatcher's awareness: pendingMsgs, broadcasting, or held).
- handleAdmit: any in-flight parent blocks. No more "parent in
same batch is fine" exception.
- canRelease: any in-flight parent (other than the just-terminalized
one) blocks. Recomputes from heldMsgs[child].InputTXIDs each call.
- releaseWaiters: walks waiters[parent] ONE LEVEL DEEP. Released
children go into pendingMsgs; their own waiters stay held until
THEY terminalize. No recursive cascade through the dep chain.
Effect: a deep chain of N held txs releases sequentially, one batch
per link. Slow for deep chains but correct under Teranode's new
parallel processing. Independent fan-out (no parent-child overlap)
is unchanged — all admit to pendingMsgs and batch normally.
cascadeReject is unchanged in shape (rejections still walk the full
subtree).
Test changes:
- TestHandleMessage_SameBatchAdmission renamed/inverted to
TestHandleMessage_HoldsChildWhenParentInFlight: child of an
in-flight parent now stays held even if the parent is in the
current pending batch.
- TestCascadeRelease_DeepChain renamed to
TestSequentialReleaseDeepChain: grandparent ACCEPTED releases
parent ONLY; child stays held until parent ACCEPTED separately.
api_server bulk-handler comment updated to reflect the new
"hold until parent terminalizes" semantics rather than the old
"co-batch parents and children" claim.
Verified:
- go test -count=1 -race -timeout 60s ./services/... ./app/... -- all pass
- golangci-lint run -- 0 issues
* Update dep-aware-dispatch plan to reflect agreed retry model
Captures the decisions reached in design discussion that didn't make
it into the first implementation pass:
- Dispatcher state names match the actual code (heldMsgs, pendingMsgs,
offsetTracker) instead of the original sketch (pendingParents,
retryQueue, offsetHeap).
- Offset commit: explicit deferred MarkMessage in kafka/consumer.go
driven by a 200ms propagator-side ticker that reads the dispatcher's
LowestUnfinished.
- Retry handling distinguishes the two failure shapes:
- Merkle /watch: inline retry the whole batch (binary failure).
- Teranode per-slot infra: requeue individual txs back to
pendingMsgs after a short flat wait.
- No attempt counter / no exhaustion — infra failures retry forever.
- Backoff for merkle inline retry: 100ms -> 500ms -> 2s -> 5s -> 10s,
then 10s steady.
- Per-tx /tx fallback removed; /txs response per-slot lines from
#879 + #881 carry the per-tx classification.
- TX_MISSING_PARENT becomes terminal (dep-aware dispatch already
gates children, so missing-parent from Teranode means an unknown
parent — wallets resolve it).
- Added Dependencies section noting #879 and #881.
* Dep-aware dispatch: offset tracker, deferred kafka commit, infra-retry loops
Implements the parts of the dep-aware-dispatch plan that the previous
pass missed:
- offset_tracker.go: min-heap with lazy deletion. Add on admit, Done on
terminal/cascade, LowestUnfinished read by the watermark ticker.
- dispatcher.go: inFlight now stores offsets (txid → offset). handleAdmit
registers the offset; handleTerminal and cascadeReject mark Done. New
handleRequeue re-admits a previously-admitted tx after an infra
failure without re-tracking its offset. New watermarkCh + requeueCh
channels keep all state mutations on the dispatcher goroutine.
- kafka/consumer.go: opt-in DeferredCommit mode. processOne stops
MarkMessage-ing immediately; pending messages accumulate in a per-claim
map keyed by offset; new SetCommitWatermark(offset) lets the service
publish a watermark that the next flush boundary marks up to.
- propagator.go runWatermarkTicker: 200ms goroutine that reads the
dispatcher's LowestUnfinished and pushes (watermark - 1) into the
consumer. Started in Start.
- registerBatch: capped-exponential inline retry (100ms → 500ms → 2s →
5s → 10s, then 10s steady, forever) for merkle-service /watch
failures. No terminal REJECTED for infra failures.
- teranode/client.go SubmitTransactions: parses the post-#881 /txs
per-slot response body. New per-slot []string return enables the
caller to classify per-tx instead of treating the batch as binary.
- broadcastBatchToEndpoints: emits []txResult with txResultClass set
per-tx (Accepted, Rejected with Teranode code, InFlight, or Requeue).
- broadcastChunk: per-tx /tx fallback removed.
- processBatch: classifies per-tx via txResultClass. Requeue slots
gather, sleep requeueWait (2s), then route back through
requeueToDispatcher. Accepted/Rejected slots terminalize as before.
- Removed reaper.go, retryable.go, backoff.go, writeTerminalRejected,
handleRetryableFailure, and the unused retryMaxAttempts /
retryBackoffMs propagator fields.
- Obsolete tests asserting old "merkle failure → terminal REJECTED"
and "all endpoints 500 → terminal REJECTED" behavior are removed
with explanatory comments. New per-slot classification and requeue
paths are covered by the existing dep-aware tests + the new
dispatcher offset wiring.
Tests: go test -race ./... — all packages pass.
Vet/lint: clean across changed packages.
* Collapse dispatcher + kafka consumer onto a single goroutine
The previous deferred-commit design split state across the dispatcher
goroutine, the kafka consumer goroutine, and a third watermark-ticker
goroutine that shuffled a shared atomic.Int64 between them. That was
exactly the cross-goroutine state-sharing pattern the dep-aware-dispatch
plan was built to avoid.
This commit consolidates: the dispatcher loop is now the Sarama
ClaimHandler, so the Sarama-managed goroutine IS the dispatcher
goroutine. All dep-aware state lives in local variables on that one
goroutine, including the per-offset *kafka.Message references used to
call claim.MarkMessage when an offset terminalizes.
Changes:
- kafka/consumer.go: remove DeferredCommit, atomic watermark, and
SetCommitWatermark. Add an opt-in ClaimHandler field that takes the
raw Claim; when set, the wrapper hands the claim straight to the
handler and skips the internal drain/flush/retry/DLQ plumbing.
Other consumers (webhook, watchdog, etc.) keep the existing
callback-handler path unchanged.
- services/propagation/dispatcher.go: runDispatcher now takes a
kafka.Claim parameter. The select reads from claim.Messages() when
non-nil and from admitCh when nil; both feed the same handleAdmit
logic. On terminal events the loop calls advanceMarks which walks
pendingMarks (offset → *kafka.Message) and calls claim.MarkMessage
for every offset strictly below the tracker's LowestUnfinished.
Backpressure now applies to both message sources via the same
nil-channel-in-select trick. flushTicker only ticks in production
(claim != nil) so tests can drive flushes explicitly.
- services/propagation/propagator.go: Start cancels the test-mode
dispatcher goroutine started in New, then registers
ClaimHandler: p.handleClaim, which runs the same runDispatcher loop
on Sarama's per-claim goroutine. runFlushTicker and
runWatermarkTicker are deleted (the consolidated loop handles
flush, terminal events, and offset marking inline).
One goroutine for all state. No atomics. No mutexes. No ticker
goroutine pushing values across goroutine boundaries.
Tests: go test -race ./... — all green.
* Wire api_server submission paths into the in-process TxTracker
The bump-builder's filterTrackedTxids relies on TxTracker to know
which level-0 hashes from a compound BUMP correspond to txs arcade
is responsible for. Without a TxTracker entry the filter returns
empty for that txid and markMinedAndPublish never runs — txs that
broadcast successfully and reach SEEN_ON_NETWORK never advance to
MINED. That's the e2e-smoke regression on TestSmoke_RealBlockMined_*.
The original wiring lived in the tx_validator service (Add was
called inside Validator.Validate). The dep-aware-dispatch plan
removes that service and moves validation into the api_server.
The TxTracker.Add call needs to move with it. Add it in both:
- handleSingleTx: after the dedup CAS, before the kafka publish.
- handleSubmitTransactions (bulk): after the dedup CAS, before the
kafka SendBatch.
Both paths only Add txids that are toPublish — duplicates and
dedup-CAS losers are skipped so we don't churn the tracker for
no-op submissions.
Tests: go test -race ./services/api_server/... ./services/propagation/...
— all pass.
* Replace inline retry/requeue with reaper-driven rebroadcast
Strips out the exponential-backoff machinery the dep-aware code had
been carrying around and replaces it with a single durable retry
surface: a per-tick reaper that walks the status store, picks up rows
stuck non-terminal past per-status thresholds, and rebroadcasts them
through the same registerBatch + broadcastInChunks pipeline as the
hot path.
What's gone:
- merkleRetryBackoffs, merkleRetryBackoff, the registerBatch retry
loop. /watch failures now just exclude the failed txs from this
batch — they stay at RECEIVED in the store and the reaper picks
them up later.
- inlineRetryAttempts / inlineRetryDelay and the broadcastSingleToEndpoints
retry loop. Single broadcast attempt; reaper handles retries.
- requeueWait, txResultClassRequeue (renamed to txResultClassSkip),
the dispatcher requeue channel + handleRequeue + requeueToDispatcher,
and the processBatch sleep-and-resend loop.
What's new:
- services/propagation/reaper.go: a lease-guarded scan that walks
IterateStatusesSince(now - 24h), picks up RECEIVED rows older than
30s and SEEN_ON_NETWORK rows older than 1h, and rebroadcasts in
batches of up to 200. Results flow through applyTerminalStatuses
so the dispatcher gets terminal-notify callbacks the normal way.
- api_server submission paths now populate RawTx on the inserted
status row so the reaper has the body to rebroadcast.
- processBatch handling for txResultClassInFlight: notify the
dispatcher with StatusAcceptedByNetwork so the Kafka offset
advances and any dep-waiters release, without writing a status
row — the SEEN_ON_NETWORK / MINED callback owns the row from here.
Tests + lint clean.
* Wait for reaper/merkle-replay goroutines in Stop
Both runReaper and runMerkleReplay were spawned naked from Start, so
when the surrounding app cleanup cancels their ctx and closes the
store immediately after, an in-flight IterateStatusesSince or
leaser.Release can race against a closed store. The pebble backing
panics on calls after Close, the test framework catches the panic
from the orphaned goroutine, and the failure is attributed to the
test's t.Cleanup — which is exactly the TestSmoke_RealBlockMined_
ViaReprocess failure on the latest CI run.
Track both goroutines in a per-propagator WaitGroup and have Stop
wait on it after the dispatcher's context is cancelled. The wait is
bounded by the existing ctx cancellation that triggers each
goroutine's exit path, so it adds no latency in the steady-state
shutdown — it just keeps the cleanup ordering clean.
* Fix 'cancelled' misspelling (lint)
* Remove accidentally-committed merkle-service proposal docs
These were drafted alongside the dep-aware-dispatch work but aren't
part of this PR's scope. Got pulled in by a stray git add -A in an
earlier commit. Removing keeps the PR diff focused on the
propagation + reaper changes.
* Parse /txs per-slot results on HTTP 207
Teranode #879 + #881 moves the per-slot response body from HTTP 500 to
HTTP 207 Multi-Status. SubmitTransactions now parses the per-slot body
on 207 only; 4xx/5xx are treated as pure infra failures with no body
parsing.
Adds unit tests for the 207 per-slot path and for 500 (no per-slot).
Updates docs to drop the speculative pre-#881 500 path that was never
in production Teranode.
* Collapse single-tx broadcast path; route everything through /txs
/txs handles a chunk of one tx identically to a chunk of N — same per-tx
work on the Teranode side, no extra HTTP overhead. The single-tx path
existed only to interpret /tx's HTTP-status-coded responses, which the
per-slot /txs classifier subsumes. Net -397 lines.
Removed:
- broadcastSingleToEndpoints / broadcastSingleOnce / singleResultToTxResult
- broadcastResult struct
- Client.SubmitTransaction
- txResultClassInFlight (only emitted by the /tx 202 path)
- statusPriority + pendingRetryCount helpers (now unused)
- Tests that asserted /tx-vs-/txs routing or /tx 202 in-flight behavior
broadcastChunk now always calls broadcastBatchToEndpoints regardless of
chunk size. The classification pipeline (Accepted / Rejected / Skip) is
the single source of truth for per-tx outcomes.
* Remove tx_validator service
The validator was already disabled (the previous commit left
`_ = tx_validator.New` to keep the import alive). This finishes the
deletion:
- Remove services/tx_validator/{validator,validator_test}.go and
deploy/tx-validator.yaml.
- Drop TxValidatorConfig from config + the tx_validator block from
config.example.yaml. Drop "tx-validator" from valid mode strings
and CLI help. Existing configs with mode: tx-validator now fail-fast
with an invalid-mode error instead of starting a no-op service.
- Drop the five TxValidator* Prometheus metrics and the test that
touched them.
- Inline tx_validator.CollectInputTXIDs into api_server as a local
helper so the intake handler no longer imports the removed package.
- Comment cleanup across propagator/reaper/teranode/store/e2e for
stale tx_validator references.
Intake validation, dedup, and the input_txids envelope are all
already done synchronously in services/api_server/handlers.go, so
there's no behavior change.
* Inline-requeue for transient broadcast failures; rename Skip class
Rework how the propagation pipeline handles broadcast outcomes that
don't carry a per-tx verdict (no healthy peers, /txs returned 4xx/5xx
with no per-slot body, per-slot infra-bucket code).
- Renamed txResultClassSkip → txResultClassRequeue. The new class name
describes the action: feed the tx back through dispatcher admission
via the new requeueCh, after a flat 2s wait. The dispatcher's
inFlight entry and pinned Kafka offset both persist across the
requeue, so dep-ordering and at-least-once delivery are preserved.
- registerBatch now returns (registered, failed); failed-watch txs
are also routed through requeueAfterDelay instead of being silently
dropped on the floor.
- Reaper drops its RECEIVED-row scan. A RECEIVED row that is not also
in a Kafka queue can only exist because intake's Kafka publish
failed and the client got a 5xx — at that point the client owns
retry, and arcade rebroadcasting on their behalf would override
their decision. The reaper now only covers SEEN_ON_NETWORK
stuckness (Teranode mempool eviction, dropped MINED callback).
- submitBroadcastJobs now blocks on a full job channel instead of
spawning a one-off goroutine. The 256-worker pool is now an actual
cap on concurrent broadcasts; backpressure flows back to the
dispatcher under saturation (peak-job math: 4 batches × 4 chunks ×
N endpoints starts exceeding the 1280-slot cap around 80 endpoints).
- teranode client: stop stripping a "Failed to process transactions:"
heading line in parseTxsPerSlot. That heading was in the pre-#881
500-path body, which we no longer parse — under #881 the 207 body
is just per-slot lines.
* Restore Client.SubmitTransaction on the teranode client
POST /tx is a real Teranode endpoint and SubmitTransaction is the
client surface for it. Removing it from the client (in c1f5c58) along
with arcade's own use of /tx meant callers wanting the single-tx
endpoint would have to reimplement the HTTP boilerplate. Restore the
method and its test so the client stays a faithful Teranode interface
regardless of whether arcade's current pipeline uses it.
The propagation pipeline still routes everything through /txs — that
direction doesn't change.
* Parse Teranode failure-list response by txid
Teranode upstream main #879 emits HTTP 500 with body "Failed to process
transactions:\n<NAME> (<num>): [ProcessTransaction][<txid>] <msg>\n..."
for any /txs batch with at least one failure. Each line corresponds to
one failed tx; successes are absent.
- teranode/client.go: SubmitTransactions now extracts a txid → error-line
map from the failure body (parseTxsFailures). Returns nil for HTTP 200
(everything accepted) and for any 4xx/5xx whose body doesn't carry the
"Failed to process transactions:" header (treated as a pure infra
failure by the caller).
- services/propagation/propagator.go: broadcastJobResult.perSlot →
broadcastJobResult.failures (map[string]string keyed by txid).
broadcastBatchToEndpoints classifies each submitted tx by lookup in
the failure map — present = REJECTED on a terminal Teranode code or
REQUEUE on an infra-bucket code; absent = ACCEPTED.
- classifyPerSlotLine → classifyFailureLine (Accepted branch dropped —
accepted txs aren't in the failure map).
- TxsResultOK constant removed (no per-slot "OK" sentinel under the new
format).
Replaces the speculative 207 Multi-Status path from the bsv-blockchain/
teranode#881 PR after that PR was rejected. The single Teranode-side
behavior that produces orphaned lines (no txid) is a processing-time
panic, which only affects one tx; arcade's reaper picks up the
unmatched stuck row and rebroadcasts on its next tick.
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
* Address feedback
* parseTxsFailures: fail-closed on orphan lines
Any non-empty failure line without an extractable txid now returns
nil from parseTxsFailures, dropping to the caller's whole-batch
requeue path. The only Teranode path that produces such a line is the
processOne panic recover (Server.go:740-744), which doesn't include
tx.TxID() in the wrapper. Re-broadcasting the whole batch is safe
(Teranode dedups on store-level) and avoids silently marking the
orphan's owner as ACCEPTED.
* Respect datahub discovery flag after startup
* Address feedback
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
* Address feedback
* Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
* Fix CheckExactPartitions missing-topic test to match contract
The test name and assertion described the soft-fail behavior of
CheckPartitions, but CheckExactPartitions hard-fails on missing topics
by design — the call site in app/app.go relies on it to abort startup,
because Kafka auto-creating TopicPropagation on first publish would use
the broker default partition count and silently break the dispatcher's
single-partition invariant.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
---------
Co-authored-by: Dylan <64976002+galt-tr@users.noreply.github.com>
Co-authored-by: Dylan <dylan@britevue.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>



Summary
Audit of
httpStatusForTxErrorinservices/propagation/Server.go. Two transaction-related errors were falling through to the default 500 case, conflating validation conditions with infrastructure failures and causing downstream consumers (e.g., Arcade) to retry txs that should be terminal or accepted.ErrTxMissingParent→ 422 Unprocessable Entity. The tx is structurally valid but its parent isn't in the UTXO store yet. A client-actionable validation-time condition, distinct from 400 (malformed) and 409 (conflict).ErrTxExists→ 200 OK. Fires when the UTXO store reports the txid is already committed. Semantically the resource is already in the desired state, so this is idempotent success, not a conflict. The "creation in progress" edge case at aerospike/create.go#L1078 is handled by the validator'sCreateInUtxoStorepath before it reaches the HTTP boundary.Other errors deliberately left at 500 because they indicate genuine infra / data-integrity failures:
ErrUtxoHashMismatch,ErrUtxoError,ErrStorageError,ErrServiceError, genericErrProcessing,ErrTxNotFound(translated upstream toErrTxMissingParent).Test plan
go test -count=1 ./services/propagation/...— passgo vet ./services/propagation/...— cleangolangci-lint run ./services/propagation/...— 0 issuesTestHandleSingleTxfor both new mappings, including aNewProcessingErrorwrapper case to verify error-chain walking