Plan: dependency-aware dispatch#151
Conversation
Plan for tracking parent-child relationships between in-flight transactions so children are not broadcast until their parents have reached a terminal state. Also covers the pipeline simplifications that fall out of the change: validation moves into intake, the tx_validator service is removed, the propagation topic becomes single-partition with a single-goroutine dispatcher, and the PENDING_RETRY status / reaper are replaced with an in-memory retry queue backed by Kafka replay for durability. Intended for discussion before implementation. Single coordinated deploy with a new propagation topic and a drain-the-old-queue prerequisite; no backward-compat code path retained.
Introduces the single-goroutine dispatcher described in the plan. The
dispatcher reads transactions from an incoming channel, decides whether
to admit each to a pending broadcast batch or hold it as a waiter on
in-flight parents, and emits flushed batches to a downstream worker
channel. Status flips coming back (from broadcast workers and the
merkle-service callback path) drive waiter release, recursive
cascade-reject, retry queueing, and offset terminalization.
Components:
- dispatcher.go: types (dispatcherMsg, inFlightEntry, statusFlip),
the Dispatcher struct with its dep-index/retry-queue state, the
main select loop, and handlers for incoming messages, status
flips, and timer-driven batch flushes / retry wake-ups.
- offset_tracker.go: a min-heap with lazy deletion behind the
offsetTracker interface. Add records an in-flight offset, Done
marks one terminalized, LowestUnfinished gives the commit point.
- dispatcher_test.go: unit tests covering the baseline admit path,
parent-in-flight hold, cascade-reject for REJECTED parents, the
multi-parent case, retryable failure requeue with backoff, and
retry exhaustion. Plus an OffsetTracker test for the lazy-delete
semantics.
Not wired up to anything yet — the dispatcher stands alone. The next
commit will connect it to a Kafka consumer feeding incomingMsgs and a
broadcast-worker pool consuming outgoingBatch. Existing propagator,
tx_validator, and reaper code paths are untouched.
Retry classification uses HTTP status codes (422 for ErrTxMissingParent,
0 for no-response) rather than the existing IsRetryableError text match
— the latter is removed in a follow-up commit alongside the wiring
change. The dependency on Teranode returning 422 is satisfied by
bsv-blockchain/teranode#870.
Verified:
- go test -count=1 -race ./services/propagation/... -- pass
- go vet ./services/propagation/... -- clean
- golangci-lint run ./services/propagation/ -- 0 issues
Builds the dedicated Kafka consumer for the dependency-aware
propagation topic. Distinct from the existing kafka.ConsumerGroup
wrapper (still used by webhook, watchdog, etc.) because it defers
Claim.MarkMessage until the dispatcher reports the message's offset
terminalized via its offsetTracker. Without that, a crash would leave
in-flight txs lost: Kafka would treat their offsets as committed while
the dispatcher's in-memory state was gone.
Architecture is two cooperating goroutines off Run:
- Read loop: pulls messages from a Claim, decodes the JSON envelope
(now including input_txids), records the message in a pending map
keyed by offset, and pushes a dispatcherMsg onto the dispatcher's
incomingMsgs channel. The send is blocking, so backpressure
propagates naturally up through the claim buffer into the broker.
- Commit loop: ticks every 200ms (configurable), queries
offsetTracker.LowestUnfinished, and calls MarkMessage on every
pending message with offset below that cursor. Drops committed
entries from the pending map.
To make this safe, offsetTracker now guards its state with a mutex.
The dispatcher's writes (Add/Done) and the consumer's reads
(LowestUnfinished/Empty) cross goroutine boundaries; without the
mutex, the race detector flagged it correctly. Uncontested mutex
overhead is ~10-20ns per op — negligible against the dispatcher's
~5-10μs per-message JSON decode.
The consumer is not yet wired into the propagator. The next commit
will replace the existing parallel-consumer pool with this consumer
plus the dispatcher.
Verified:
- go test -count=1 -race ./services/propagation/... -- pass
- go vet ./services/propagation/... -- clean
- golangci-lint run ./services/propagation/ -- 0 issues
Adds the broadcast side of the dep-aware pipeline. Consumes batches
from the dispatcher's outgoingBatch channel, submits them to Teranode,
writes terminal status rows to the store, and emits per-tx statusFlips
back to the dispatcher.
Design:
- Multi-tx batches go to /txs first. On HTTP 2xx every tx in the
batch gets ACCEPTED_BY_NETWORK — Teranode handles intra-batch
dependency ordering internally so an HTTP 200 implies every tx was
accepted into the mempool. /txs returns aggregate-only status, so
on HTTP non-2xx the broadcaster falls back to per-tx /tx calls to
recover per-tx status codes (some txs in a failed batch may
succeed individually).
- Single-tx batches go directly to /tx, skipping the batch path.
- Fan-out across healthy endpoints is parallel; first 2xx wins and
cancels siblings. No healthy endpoints maps to statusCode=0,
which the dispatcher classifies as retryable.
- Store writes use BatchUpdateStatus per tx. Write failures are
logged but don't block the statusFlip emission — the dispatcher
advances its in-memory state regardless, and Kafka replay
reconciles divergence on restart.
- Sibling cancellation does NOT count against endpoint health — a
canceled-by-broadcast error is filtered before aggregating
outcomes.
Tests cover the four meaningful paths: batch accepted (no fallback),
batch rejected with per-tx fallback, single-tx direct, and no healthy
endpoints (retryable signal).
Verified:
- go test -count=1 -race ./services/propagation/... -- pass
- go vet ./services/propagation/... -- clean
- golangci-lint run ./services/propagation/ -- 0 issues
Restructures the submit handler for the dep-aware pipeline. The intake
HTTP handler now performs the work the tx_validator service used to do
asynchronously: parse, run policy validation, dedup via store CAS, and
publish to the dispatch topic with input_txids extracted from the
parsed transaction.
Changes:
- kafka/topics.go: add TopicDispatch and TopicDispatchDLQ for the
dep-aware pipeline's fresh single-partition topic.
- services/api_server/server.go: add a validator.Validator field
threaded through New(); existing tests using struct-literal
construction get a nil-safe degraded mode (skips validation).
- services/api_server/handlers.go: handleSubmitTransaction now runs
ValidateTransaction (policy-only, matching tx_validator's current
behavior), persists REJECTED rows synchronously on validation
failure, performs the dedup CAS via GetOrInsertStatus, and
publishes to TopicDispatch with an envelope including input_txids.
- app/app.go: pass d.Validator to api_server.New so production
deployments get full validation behavior.
- handlers_test.go: extend mockStore with an overrideable
getOrInsertStatusFn so tests can exercise the dedup path. Default
behavior returns inserted=true so existing tests continue to see
fresh-submission semantics.
- intake_dispatch_test.go: new test file covering the four cases:
happy-path publish to TopicDispatch with correct envelope shape,
dedup hit returns idempotent 202 with existing state, transient
store error continues to publish, and the pathological
not-inserted-with-nil-existing case (defensive branch).
The legacy TopicTransaction publish is gone — nothing in production
will be producing to that topic anymore. The tx_validator service
remains as code but its consumer will see no traffic; removed in a
follow-up commit alongside the propagator entry-point swap.
Verified:
- go test -count=1 -race ./services/api_server/... ./services/propagation/... -- pass
- go vet ./... -- clean
- golangci-lint run on touched packages -- 0 issues
Builds the propagation.Pipeline that wires Dispatcher,
dispatcherConsumer, and dispatcherBroadcaster together with their
shared channels and lifecycle. Implements services.Service so
app.BuildServices can drop it in alongside (or eventually replacing)
the legacy Propagator.
Architecture:
- dispatcherConsumer reads from kafka.TopicDispatch, defers Kafka
offset commit until the dispatcher's offsetTracker reports each
tx terminalized.
- Dispatcher runs in its own goroutine, owning the in-flight map,
waiter index, retry queue, and offset heap. No locks because
only this goroutine mutates them.
- dispatcherBroadcaster reads emitted batches, submits to Teranode
via the existing teranode.Client (batch endpoint with per-tx
fallback on all-rejected), writes terminal status rows to the
store, and emits per-tx statusFlips back to the dispatcher's
return channel.
The dispatcher's rejectedSink is wired so cascade rejections from
REJECTED parents also produce a terminal store row — implemented as
a synthetic statusFlip onto the same flips channel the broadcaster
uses, keeping all terminal-write logic in one place
(dispatcherBroadcaster.emitOutcome). The synthetic flip falls back to
a non-blocking send so a full flips channel doesn't stall the
dispatcher loop; correctness is preserved by Kafka replay since the
parent's offset is not committed until the child terminalizes.
Lifecycle:
- NewPipeline constructs all components and channels but does NOT
start any goroutine.
- Start spawns the broadcaster and dispatcher goroutines, then
runs the consumer in the caller's goroutine (blocking until
ctx is canceled or the subscription ends).
- Stop cancels the internal context, which propagates to the two
background goroutines, and waits for them to exit.
End-to-end tests cover:
- Happy-path batch: two unrelated txs flow through, both end up
ACCEPTED_BY_NETWORK in the store, one /txs hit on Teranode.
- Dep-aware ordering: parent and child published in order, child
held until parent's broadcast completes (verified by an
artificially-slow parent and the order of /tx submissions
observed by the test Teranode emulator).
Not yet wired into app.BuildServices — that swap is the next commit,
landing alongside removal of the legacy propagator entry point.
Merkle-service registration (the F-024 invariant for prior code) is
not yet plumbed in; needed before the cutover but separated from this
commit to keep the diff focused on the dispatcher wiring.
Verified:
- go test -count=1 -race ./services/propagation/... ./services/api_server/... -- pass
- go vet ./... -- clean
- golangci-lint run on touched packages -- 0 issues
Before broadcasting a batch to Teranode, the broadcaster now
registers every tx with merkle-service for /watch tracking. Per-tx
registration failures emit a retryable statusFlip (statusCode=0)
so the dispatcher's retry queue re-dispatches them on backoff. Txs
that register successfully proceed to the existing /txs (with /tx
fallback) path.
This preserves the F-024 invariant from the legacy propagator: no
transaction reaches Teranode without a matching /watch entry, so
MINED / SEEN_ON_NETWORK / STUMP callbacks always have a row to
update.
Behavior matches the legacy registerBatch in two important ways:
- When merkleClient is nil OR callbackURL is empty, registration
is skipped entirely and every tx in the batch proceeds to
broadcast. Same fallback the legacy path used.
- Per-tx independence: a failing tx in a batch does NOT block the
rest of the batch from broadcasting. The successful subset
proceeds; failures take the retry path.
The dispatcher's retry queue handles the failed registrations the
same way it handles transient broadcast failures — same
isRetryableStatusCode logic, same exponential backoff via
computeRetryDeadline.
Pipeline wires the merkle deps through cfg.CallbackURL,
cfg.CallbackToken, and cfg.Propagation.MerkleConcurrency so
production behavior matches the legacy propagator.
Test coverage: a batch containing one tx whose merkle registration
fails (HTTP 500 from merkle-service) and one that succeeds. The
failing tx produces a retryable statusFlip and never reaches
Teranode; the successful tx broadcasts normally.
Verified:
- go test -count=1 -race ./services/propagation/... -- pass
- golangci-lint run ./services/propagation/ -- 0 issues
app.BuildServices now constructs propagation.NewPipeline for the
"propagation" service mode. The legacy propagation.New(...) call is
removed from the live wiring; the type and its surrounding code stay
in the source tree for a follow-up cleanup commit.
The Pipeline takes a tighter dependency set than the old Propagator:
no leaser (no reaper lease needed — single consumer per partition
makes the cluster-wide lock obsolete) and no publisher (status fan-out
flows through the existing Kafka publisher inside the store/event
layer, not through the propagator).
After this commit the dispatcher pipeline is the production
propagation path. Submissions flow:
POST /tx → api-server intake (parse + validate + dedup CAS)
→ kafka.TopicDispatch (new single-partition topic)
→ propagation.Pipeline
├── dispatcherConsumer (deferred commit)
├── Dispatcher (dep index, retry queue)
└── dispatcherBroadcaster (merkle register + Teranode submit)
The legacy tx_validator service still runs (consuming TopicTransaction)
because nothing has been removed yet — but no producer feeds it, so
it's idle. That removal lands next.
Verified:
- go test -count=1 -race ./services/propagation/... ./services/api_server/... ./app/... -- pass
- go vet ./... -- clean
- golangci-lint run on touched packages -- 0 issues
The tx_validator service is dead code after the intake handler took over its responsibilities (parse + policy validate + dedup CAS) and intake started publishing directly to TopicDispatch instead of TopicTransaction. Nothing produces to TopicTransaction anymore, so the service would just sit idle. Removed: - services/tx_validator/ directory (validator.go + validator_test.go) - app/app.go import and "tx-validator" service registration - config/config.go "tx-validator" mode entry The TxValidator config block (cfg.TxValidator) is still defined in config.go for backward read compatibility — operators with a tx_validator: section in their config.yaml see it silently ignored rather than getting a validation error. Removing the struct entirely can happen in a separate cleanup once the deploy has rolled out. Metrics with arcade_tx_validator_* names also stay defined for now; they'll never be incremented in the new pipeline but keeping the definitions prevents Prometheus scrape errors during the cutover. Verified: - go test -count=1 -race ./services/... ./app/... ./config/... -- pass - go vet ./... -- clean
Two type renames that should have been extensions in the first place:
- The dispatcher's incoming-message type is the existing
propagationMsg, extended with InputTXIDs and KafkaOffset fields.
Previously this was a parallel "dispatcherMsg" type — same JSON
shape, same fields — which created unnecessary divergence.
- Status-flip events between broadcaster/dispatcher use
*models.TransactionStatus directly. The existing type already
carries TxID, Status, StatusCode, and ExtraInfo, which is exactly
what the per-tx outcome carries. The internal "statusFlip" struct
was just a four-field subset of the same shape.
The broadcaster now constructs one *models.TransactionStatus per
outcome and uses it both as the store write payload AND as the
in-flight status flip sent to the dispatcher. Removes a redundant
local struct.
Reverts the gratuitous "msg" → "envelope" variable rename in
handleSubmitTransaction; the existing msg variable just gets one new
key in its map literal.
Net diff is smaller and matches the existing type vocabulary across
the codebase.
Verified:
- go test -count=1 -race ./services/... ./app/... -- pass
- golangci-lint run on touched packages -- 0 issues
The dispatcher pipeline is now the production path. Everything in the
legacy propagation service is dead code:
- propagator.go: the parallel-consumer Propagator type and its
flushBatch / processBatch / registerBatch / broadcast helpers.
Replaced by Dispatcher + dispatcherConsumer +
dispatcherBroadcaster.
- reaper.go: the lease-coordinated PENDING_RETRY reaper.
Replaced by the dispatcher's in-memory retry queue (Kafka replay
provides crash durability).
- retryable.go: IsRetryableError text-match classification.
Replaced by isRetryableStatusCode using the HTTP status codes
Teranode now returns (per the audit PR landed upstream).
- backoff.go: ComputeBackoff helper for the reaper. The dispatcher
has its own inlined retry-deadline math (computeRetryDeadline).
- replay.go: the one-shot merkle-service replay on startup. Useful
as a recovery mechanism for merkle-service state loss but not
needed for the initial cutover; a follow-up can add it back if
operationally warranted.
- propagator_test.go: 2k lines of legacy-path tests, including the
comprehensive mockStore with bookkeeping for PendingRetry,
retryCount, merkleMarks, etc.
- health_test.go: tests of the legacy propagator's broadcast
behavior (first-success-wins across endpoints). Equivalent
coverage now lives in dispatcher_broadcaster_test.go for the new
broadcaster.
The propagationMsg type moved from propagator.go to dispatcher.go
where it's the natural neighbor of the dispatcher that consumes it.
A minimal mockStore lives in mocks_test.go — just enough to satisfy
the store.Store interface via embedding. Tests that need specific
store behavior compose over it (broadcastTestStore, pipelineTestStore).
Net: 4276 lines removed, ~25 lines added.
Verified:
- go test -count=1 -race ./... -- all packages pass
- golangci-lint run ./services/propagation/ -- 0 issues
The plan called for renaming the propagation topic (TopicDispatch) and
changing how it gets consumed (single-partition dispatcher pipeline)
— it did NOT call for renaming the service's external API. Reverts
the gratuitous Pipeline / NewPipeline names back to the original
Propagator / New.
Concretely:
- type Pipeline → type Propagator
- func NewPipeline → func New
- pipeline.go → propagator.go (the file lives next to its peers
again under the original name)
- pipeline_test.go → propagator_test.go
- Test names TestPipeline_* → TestPropagator_*
- app.BuildServices reference propagation.New (no signature change
visible to callers)
Internal types stay as-is: Dispatcher, dispatcherConsumer,
dispatcherBroadcaster — those are genuinely new components introduced
by the dep-aware pipeline; renaming them would just rename internal
implementation details without changing what they are.
Verified:
- go test -count=1 -race ./services/propagation/... ./app/... -- pass
- golangci-lint run ./services/propagation/ ./app/ -- 0 issues
…gation.New" This reverts commit 1db4a07.
This reverts commit b10b45a.
This reverts commit 319b4dc.
This reverts commit 26604ca.
This reverts commit 95a5dd6.
This reverts commit ab4e72a.
This reverts commit e0b181a.
…dler" This reverts commit 92c429a.
This reverts commit 0f8d9e7.
This reverts commit a5958ce.
This reverts commit c34de7f.
Implements the dep-aware dispatch described in docs/plans/. Targeted
modifications to the existing Propagator and intake handler; no
architectural rework, no type churn.
Changes:
- propagationMsg gains an InputTXIDs field. Optional on the JSON
side so older producers continue to interoperate; the propagator
treats absent/empty as "no in-flight parents".
- Intake handler (api_server) now performs the work the
tx_validator service used to do asynchronously: parse, run policy
validation via validator.ValidateTransaction (skipFees=true,
skipScripts=true — matches the legacy tx_validator behavior),
dedup CAS via store.GetOrInsertStatus, extract InputTXIDs, and
publish to kafka.TopicPropagation directly. Validation failure
persists a terminal REJECTED row and returns 400 synchronously.
- api_server.New now takes a *validator.Validator; app.BuildServices
threads d.Validator through. Existing tests using struct-literal
construction (validator nil) skip validation, preserving their
behavior.
- Propagator gains an in-flight dep index: inFlight set, waiters
map (parent → set of children), pendingParents map (child → set
of unmet parents), heldMsgs map (held child txid → message).
handleMessage now checks input txids against inFlight and, if any
parent is in flight, registers the message as a waiter instead
of admitting it to pendingMsgs.
- applyTerminalStatuses hooks into handleTerminalForDeps after the
bulk publish: ACCEPTED parents release waiters whose pendingParents
set becomes empty (re-entered into pendingMsgs so the next
flushBatch picks them up); REJECTED parents cascade-reject every
descendant recursively, writing terminal REJECTED rows and
emitting a bulk publish for SSE/webhook subscribers. Terminalized
txids are dropped from inFlight.
- tx_validator gains an exported CollectInputTXIDs helper that
intake reuses, keeping the wire format extraction identical
regardless of producer. tx_validator itself is no longer
registered in app.BuildServices — intake covers its job — but
the package stays in the source tree as preserved queue-processing
scaffolding (its message format includes input_txids too in case
anyone re-activates it).
- No new topic constant, no new types, no architectural split. The
existing TopicPropagation carries the extended message format.
Single-partition deployment is an operator choice, not a code
constraint.
Verified:
- go test -count=1 -race ./services/api_server/... ./services/propagation/...
./services/tx_validator/... ./app/... -- all pass
- go vet ./... -- clean
- golangci-lint run -- 0 issues
depaware_test.go covers the five core behaviors:
- HoldsChildWhenParentInFlight: a tx whose InputTXIDs include a tx
currently in flight lands in heldMsgs + pendingParents + waiters,
not pendingMsgs.
- ReleasesWaitersOnAccepted: parent's terminal ACCEPTED clears its
waiters' pendingParents entry; a child whose set is now empty is
re-entered into pendingMsgs.
- CascadesRejectedChildren: parent's terminal REJECTED recursively
rejects every descendant (child, grandchild). Each cascaded row
is written to the store via BatchUpdateStatusReturning.
- NoParents_AdmitsNormally: tx with no InputTXIDs follows the
legacy admission path, no waiter state.
- ParentNotInFlight_AdmitsChildDirectly: only IN-FLIGHT parents
create waits — children of long-mined or never-seen parents go
straight to pendingMsgs.
Behavior fix uncovered by the cascade test: a held waiter must still
appear in inFlight so DEEPER descendants (grandchild waiting on a
held child) register their waiter edge correctly. Handle this by
adding propMsg.TXID to inFlight BEFORE the hold/admit decision in
handleMessage. Removed the post-release inFlight insert in
handleTerminalForDeps since the child was already there.
persistCascadeRejections now uses BatchUpdateStatusReturning to match
applyTerminalStatuses's existing store contract (mockStore implements
that method, not BatchUpdateStatus).
Verified:
- go test -count=1 -race ./services/propagation/... -- all pass
- golangci-lint run ./services/propagation/ -- 0 issues
The previous version put inFlight / waiters / pendingParents /
heldMsgs on the Propagator struct behind the existing p.mu mutex.
That was the wrong shape: with a single Kafka consumer per partition
and the dispatch decisions all logically owned by one ordering
domain, the dep-index should live in a single goroutine with no
locks. The mutex was reading concurrent traffic from handleMessage
(consumer goroutine) and applyTerminalStatuses (processBatch
goroutines, up to MaxConcurrentBatches in parallel) — workable, but
extra synchronization complexity for state that has a natural single
owner.
This commit:
- Creates services/propagation/dispatcher.go containing the
dispatcher goroutine (runDispatcher) plus the admitRequest /
terminalEvent / terminalResult protocol types. All dep-index
state (inFlight, waiters, pendingParents, heldMsgs) is declared
inside runDispatcher's function body — nothing else can reach
them. The dispatcher serializes admit and terminal events via a
select loop; no locks anywhere on dep-index state.
- Removes inFlight / waiters / pendingParents / heldMsgs from the
Propagator struct. Removes the holdAsWaiterLocked, releaseWaitersLocked,
cascadeRejectLocked, handleTerminalForDeps helpers — their logic
moved into the dispatcher goroutine.
- handleMessage now calls p.admitToDispatcher(propMsg), which sends
a request and waits for a synchronous admit/hold reply. Single
goroutine round-trip per message; channels are buffered so a
momentarily slow dispatcher doesn't immediately stall the consumer.
- applyTerminalStatuses now calls p.notifyTerminalToDispatcher per
terminalized txid, gets back the list of released waiters
(re-entered into pendingMsgs by the caller) and cascaded
descendants (caller writes REJECTED rows + bulk publish).
- The dispatcher goroutine is started in New (not Start) so the
existing tests that construct via New and call handleMessage
directly continue to work. Stop cancels the dispatcher.
- depaware_test.go updated to test via observable behavior
(pendingMsgs contents, ms.updates) instead of poking at private
dep-index maps, since those are no longer accessible from outside
the dispatcher goroutine.
The legacy p.mu still protects pendingMsgs (a slice the consumer
goroutine and the dispatcher both need to append to — released
waiters re-enter via the consumer-side helper). That mutex was
already there before this PR and isn't part of the dep-aware change.
Verified:
- go test -count=1 -race -timeout 60s ./services/... ./app/... -- all pass
- golangci-lint run on touched packages -- 0 issues
There was a problem hiding this comment.
Pull request overview
This PR introduces a dependency-aware propagation dispatcher and updates the pipeline to publish directly from intake to the propagation topic. It also adds Teranode /txs failure-list parsing to support per-tx classification, removes the tx_validator service, and updates tests/metrics/docs accordingly.
Changes:
- Move structural/policy validation + store dedup into the API intake handler and publish directly to
kafka.TopicPropagation(withinput_txidsattached). - Rewrite propagation to use a single-goroutine, dep-aware dispatcher with deferred Kafka offset marking, plus updated broadcast/requeue behavior.
- Extend Teranode client batch submit to optionally return a per-tx failure map parsed from Teranode’s structured 500 body; remove legacy retryable-string matching and tx-validator-related metrics/tests.
Reviewed changes
Copilot reviewed 34 out of 34 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/e2e/smoke_test.go | Updates e2e comments to reflect validation moving into intake. |
| tests/e2e/smoke_real_test.go | Updates e2e comments to reflect intake validation. |
| tests/e2e/harness/arcade.go | Updates readiness comment to remove tx-validator reference. |
| teranode/client.go | Changes /txs API to return (status, failures, err) and adds Teranode failure-list parsing. |
| teranode/client_test.go | Adds tests for failure-list parsing behavior and updates call sites. |
| store/postgres/postgres.go | Removes tx_validator-specific commentary from batch insert docs. |
| store/postgres/batch_test.go | Updates test comments to remove tx_validator references. |
| store/pebble/batch_test.go | Updates test comments to remove tx_validator references. |
| store/batch_test.go | Updates test comments to remove tx_validator references. |
| services/tx_validator/validator.go | Deletes tx-validator service implementation. |
| services/tx_validator/validator_test.go | Deletes tx-validator unit tests. |
| services/sse/sse_e2e_test.go | Updates e2e helper comment to reflect intake behavior. |
| services/propagation/retryable.go | Removes string-matching retryability helper. |
| services/propagation/retryable_test.go | Removes retryability tests tied to string matching. |
| services/propagation/reaper.go | Reworks reaper to scan for stale SEEN_ON_NETWORK rows and rebroadcast them. |
| services/propagation/propagator.go | Major rewrite: dep-aware dispatcher channels, new Teranode classification, requeue behavior, lifecycle changes. |
| services/propagation/propagator_test.go | Removes obsolete tests tied to PENDING_RETRY/backoff and updates broadcast expectations. |
| services/propagation/offset_tracker.go | Adds heap-based offset tracker for deferred commit watermarking. |
| services/propagation/health_test.go | Removes inline-retry delay tweaks (inline retry removed). |
| services/propagation/dispatcher.go | Adds single-goroutine dependency-aware dispatcher and channel protocols. |
| services/propagation/depaware_test.go | Adds unit tests for dependency gating, release/cascade, and maxPending backpressure. |
| services/propagation/backoff.go | Removes retry backoff helper (no longer used). |
| services/propagation/backoff_test.go | Removes backoff tests. |
| services/api_server/server.go | Adds validator dependency and threads it through constructor. |
| services/api_server/handlers.go | Adds input parent extraction, intake validation/dedup, publishes directly to propagation topic, adds batch submit changes. |
| services/api_server/handlers_test.go | Updates batch submit tests for 202 responses. |
| metrics/metrics.go | Removes tx-validator metrics; trims propagation metric labels now that fallback/retry changed. |
| metrics/metrics_test.go | Updates scrape test to stop referencing tx-validator metrics. |
| kafka/consumer.go | Adds ClaimHandler mode to bypass drain/flush/retry/DLQ and let a service own the claim goroutine. |
| docs/plans/dependency-aware-dispatch.md | Adds plan doc describing dependency-aware dispatch + pipeline simplifications. |
| deploy/tx-validator.yaml | Removes tx-validator deployment manifest. |
| config/config.go | Removes tx_validator config/mode; updates propagation config docs. |
| config.example.yaml | Removes tx_validator example config and updates mode comment. |
| app/app.go | Removes tx-validator service wiring; updates api-server wiring to pass validator. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| failures := make(map[string]string, len(lines)-1) | ||
| for _, line := range lines[1:] { | ||
| if line == "" { | ||
| continue | ||
| } | ||
| txid := txsTxidPattern.FindString(line) | ||
| if txid == "" { | ||
| continue | ||
| } | ||
| failures[strings.ToLower(txid)] = line | ||
| } |
| // Start a dispatcher goroutine with a nil claim so tests that | ||
| // construct via New and drive via admitCh / drainCh have a running | ||
| // state machine without needing to invoke Start. In production | ||
| // Start replaces this with the same loop running inside the kafka | ||
| // ClaimHandler — see Start. The two paths can't both be live at | ||
| // once: Start cancels this context before subscribing. | ||
| dispatcherCtx, dispatcherCancel := context.WithCancel(context.Background()) | ||
| p.dispatcherCancel = dispatcherCancel | ||
| go func() { | ||
| if err := p.runDispatcher(dispatcherCtx, nil, dispatcherConfig{maxPending: maxPending}); err != nil { | ||
| p.logger.Error("test-mode dispatcher exited with error", zap.Error(err)) | ||
| } | ||
| }() | ||
| return p |
| // Record the offset on the tracker for either branch — both held | ||
| // and admitted txs are in-flight from the consumer's perspective | ||
| // and must pin the commit watermark below this offset. | ||
| inFlight[msg.TXID] = offset | ||
| tracker.Add(offset) | ||
|
|
||
| if len(blocking) > 0 { | ||
| // Hold as a waiter. Held txs DO go into inFlight (so | ||
| // descendants can register on them) but NOT into pendingMsgs | ||
| // (they're not on the broadcast path yet). | ||
| for parent := range blocking { | ||
| set, ok := waiters[parent] | ||
| if !ok { | ||
| set = make(map[string]struct{}) | ||
| waiters[parent] = set | ||
| } | ||
| set[msg.TXID] = struct{}{} | ||
| } | ||
| heldMsgs[msg.TXID] = msg | ||
| return admitResult{held: true} | ||
| } | ||
|
|
||
| // Eligible for broadcast. Add to pendingMsgs. | ||
| *pendingMsgs = append(*pendingMsgs, msg) | ||
| return admitResult{admitted: true} |
| # Dependency-Aware Dispatch | ||
|
|
||
| Plan to add parent-child dependency awareness to Arcade's broadcast pipeline so that transactions in the queue with parent-child relationships are sequenced correctly when sent to Teranode. The redesign also simplifies the pipeline: validation moves to intake, the `tx_validator` service is removed, PENDING_RETRY status and the reaper are removed, and the dispatcher holds Kafka offsets in flight until each tx terminalizes so a crash replays everything that wasn't done. | ||
|
|
| ### Retry handling | ||
|
|
||
| PENDING_RETRY status and the reaper go away. Infrastructure failures retry forever; the mechanism differs by failure mode because merkle-service is binary while Teranode is per-tx. | ||
|
|
||
| **Merkle-service `/watch` failure — inline retry, whole batch.** `registerBatch` sleeps with capped-exponential backoff and retries the whole batch. The merkle-service `/watch` payload's only per-tx variation is the txid string — a failure is always all-or-nothing, so splitting buys nothing. The `processBatch` goroutine holds a `processBatchSem` slot during the retry loop, which is the natural backpressure we want: once all slots are held, the consumer stops pulling new work. | ||
|
|
||
| Backoff: **100ms → 500ms → 2s → 5s → 10s, then 10s steady, retrying forever**. | ||
|
|
||
| **Teranode `/txs` per-slot infra failures — requeue individually, short flat wait.** With #879 + #881, the `/txs` response delivers per-slot Teranode codes. `broadcastInChunks` walks the per-slot lines and partitions them: |
|
Looking good; lets address copilot feedback |
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 34 out of 34 changed files in this pull request and generated 9 comments.
Comments suppressed due to low confidence (1)
kafka/consumer.go:83
- NewConsumerGroup doesn’t validate that either (a) ClaimHandler is set or (b) Handler is set. In non-ClaimHandler mode, a nil Handler will panic when processing messages. Consider adding a config validation error early (e.g., require ClaimHandler != nil || Handler != nil) to fail fast on misconfiguration.
func NewConsumerGroup(cfg ConsumerConfig) (*ConsumerGroup, error) {
if cfg.Broker == nil {
return nil, fmt.Errorf("ConsumerConfig.Broker is required")
}
sub, err := cfg.Broker.Subscribe(cfg.GroupID, cfg.Topics)
if err != nil {
return nil, fmt.Errorf("subscribing: %w", err)
}
| # Dependency-Aware Dispatch | ||
|
|
||
| Plan to add parent-child dependency awareness to Arcade's broadcast pipeline so that transactions in the queue with parent-child relationships are sequenced correctly when sent to Teranode. The redesign also simplifies the pipeline: validation moves to intake, the `tx_validator` service is removed, the `PENDING_RETRY` status is removed (transient infra failures are kept in-memory and requeued through the dispatcher), the reaper is narrowed to rebroadcasting stale `SEEN_ON_NETWORK` rows that the dispatcher's in-flight set can't see, and the dispatcher holds Kafka offsets in flight until each tx terminalizes so a crash replays everything that wasn't done. | ||
|
|
| // MinPartitions is the minimum number of partitions every hot-path topic | ||
| // must have at startup. Set to the expected replica count of the largest | ||
| // horizontally-scaled consumer (tx-validator or propagation) so arcade | ||
| // fails fast when the cluster can't actually fan out across pods. Leave | ||
| // at 0 or 1 in standalone/single-replica deployments. | ||
| // must have at startup. Set to the expected replica count of the propagation | ||
| // consumer so arcade fails fast when the cluster can't actually fan out | ||
| // across pods. Leave at 0 or 1 in standalone/single-replica deployments. | ||
| MinPartitions int `mapstructure:"min_partitions"` |
| // pending-retry depth, reaper lease and tick outcomes, inline retries, | ||
| // merkle registration latency. |
| if len(stuck) == 0 { | ||
| return | ||
| } | ||
|
|
||
| p.logger.Info("reaper: rebroadcasting pending retries", zap.Int("count", len(ready))) | ||
| p.logger.Info("reaper: rebroadcasting stuck txs", zap.Int("count", len(stuck))) | ||
| metrics.PropagationReaperReadyDepth.Set(float64(len(stuck))) |
| // naturally pauses Kafka pulls and lets backpressure flow back to | ||
| // the broker. No DLQ, no error to the client; the only observable | ||
| // effect is briefly increased consumer lag. | ||
| _ = p.admitToDispatcher(propMsg, msg.Offset) |
| // Dedup CAS via GetOrInsertStatus. Two submitters racing on the | ||
| // same txid both attempt the insert; the loser sees inserted=false | ||
| // and returns 202 idempotently without re-publishing. | ||
| if s.store != nil { | ||
| row := &models.TransactionStatus{ | ||
| TxID: txid, | ||
| Status: models.StatusReceived, | ||
| Timestamp: time.Now(), | ||
| // Carry the raw bytes on the status row so the propagation | ||
| // reaper can rebroadcast txs that are stuck in non-terminal | ||
| // states without re-fetching from Kafka or the API caller. | ||
| RawTx: rawTx, | ||
| } | ||
| existing, inserted, dedupErr := s.store.GetOrInsertStatus(c.Request.Context(), row) | ||
| switch { | ||
| case dedupErr != nil: | ||
| s.logger.Error("dedup CAS failed", zap.String("txid", txid), zap.Error(dedupErr)) | ||
| // Best-effort: continue with publish. The propagator's | ||
| // in-flight set catches duplicates that slip past. | ||
| case !inserted && existing != nil: | ||
| s.recordSubmission(c.Request.Context(), txid, opts) | ||
| c.JSON(http.StatusAccepted, gin.H{ | ||
| "status": "already submitted", | ||
| "txid": txid, | ||
| "state": string(existing.Status), | ||
| }) | ||
| return |
| if s.store != nil { | ||
| ctx := c.Request.Context() | ||
| for _, p := range parsed { | ||
| row := &models.TransactionStatus{ | ||
| TxID: p.txid, | ||
| Status: models.StatusReceived, | ||
| Timestamp: time.Now(), | ||
| // Carry the raw bytes on the row so the propagation | ||
| // reaper can rebroadcast stuck txs without re-fetching. | ||
| RawTx: p.raw, | ||
| } | ||
| existing, inserted, dedupErr := s.store.GetOrInsertStatus(ctx, row) | ||
| switch { | ||
| case dedupErr != nil: | ||
| s.logger.Error("dedup CAS failed", zap.String("txid", p.txid), zap.Error(dedupErr)) | ||
| toPublish = append(toPublish, p) | ||
| case !inserted && existing != nil: | ||
| duplicates++ | ||
| s.recordSubmission(ctx, p.txid, opts) | ||
| default: | ||
| toPublish = append(toPublish, p) | ||
| } | ||
| } | ||
| } else { | ||
| toPublish = parsed | ||
| } | ||
|
|
||
| // Register every accepted tx with the in-process TxTracker so the | ||
| // bump-builder's filterTrackedTxids recognizes them when their block | ||
| // is processed. Without this, tracked-only fan-out drops every MINED | ||
| // transition and txs stay stuck at SEEN_ON_NETWORK forever. | ||
| if s.txTracker != nil { | ||
| for _, p := range toPublish { | ||
| s.txTracker.Add(p.txid, models.StatusReceived) | ||
| } | ||
| } |
| // requeueAfterDelay schedules a delayed requeue of msgs through the | ||
| // dispatcher. Spawns a goroutine that sleeps requeueDelay then sends | ||
| // each msg to requeueCh. The goroutine bails on ctx cancellation so | ||
| // claim revocation and shutdown don't hold txs in limbo. | ||
| func (p *Propagator) requeueAfterDelay(ctx context.Context, msgs []propagationMsg) { | ||
| if len(msgs) == 0 { | ||
| return | ||
| } | ||
| go func(msgs []propagationMsg) { | ||
| select { | ||
| case <-time.After(requeueDelay): | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| for _, m := range msgs { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| default: | ||
| } | ||
| p.requeueToDispatcher(m) | ||
| } | ||
| }(msgs) | ||
| } |
| type offsetTracker struct { | ||
| heap *offsetMinHeap | ||
| done map[int64]struct{} | ||
| } | ||
|
|
||
| // newOffsetTracker constructs an empty offsetTracker. | ||
| func newOffsetTracker() *offsetTracker { | ||
| return &offsetTracker{ | ||
| heap: &offsetMinHeap{}, | ||
| done: make(map[int64]struct{}), | ||
| } | ||
| } | ||
|
|
||
| // Add records an offset as in-flight. The dispatcher calls Add when it | ||
| // admits a Kafka message into pendingMsgs or heldMsgs. | ||
| func (t *offsetTracker) Add(offset int64) { | ||
| heap.Push(t.heap, offset) | ||
| } | ||
|
|
||
| // Done marks an in-flight offset as terminalized. Idempotent: marking the | ||
| // same offset twice is a no-op the second time. The dispatcher calls Done | ||
| // after a tx reaches a terminal status (ACCEPTED_BY_NETWORK, REJECTED) or | ||
| // is cascade-rejected as a child of a rejected parent. | ||
| func (t *offsetTracker) Done(offset int64) { | ||
| t.done[offset] = struct{}{} | ||
| } | ||
|
|
||
| // LowestUnfinished returns the smallest offset still in-flight, or (0, | ||
| // false) when every offset has been marked done (or none have been | ||
| // added). The Kafka commit watermark must not advance past this value: | ||
| // if it did, a crash would leave the in-flight tx unreplayed. | ||
| // | ||
| // Cleans done entries off the heap top as a side effect so the heap top | ||
| // is always an unfinished offset after a successful call returns. | ||
| func (t *offsetTracker) LowestUnfinished() (int64, bool) { | ||
| t.cleanTop() | ||
| if t.heap.Len() == 0 { | ||
| return 0, false | ||
| } | ||
| return (*t.heap)[0], true | ||
| } |
Any non-empty failure line without an extractable txid now returns nil from parseTxsFailures, dropping to the caller's whole-batch requeue path. The only Teranode path that produces such a line is the processOne panic recover (Server.go:740-744), which doesn't include tx.TxID() in the wrapper. Re-broadcasting the whole batch is safe (Teranode dedups on store-level) and avoids silently marking the orphan's owner as ACCEPTED.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 39 out of 39 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
app/app.go:99
- The MinPartitions startup guard is effectively disabled here because CheckPartitions is called with a nil topic list, so it never checks anything even when kafka.min_partitions > 1. Either pass the intended topic set(s) to enforce the setting (e.g. the horizontally-scaled topics) or remove/disable the knob entirely to avoid a misleading configuration that appears enabled but has no effect.
// MinPartitions is a soft hint for horizontally-scaled topics. There
// are currently no other hot-path topics that need it (TopicTransaction
// was retired with tx_validator) but the knob is retained so a future
// fan-out topic can opt into the check without re-introducing config.
if cfg.Kafka.MinPartitions > 1 {
if pErr := kafka.CheckPartitions(broker, nil, cfg.Kafka.MinPartitions, logger); pErr != nil {
_ = producer.Close()
return nil, nil, fmt.Errorf("kafka partition check: %w", pErr)
}
}
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
| # Dependency-Aware Dispatch | ||
|
|
||
| Plan to add parent-child dependency awareness to Arcade's broadcast pipeline so that transactions in the queue with parent-child relationships are sequenced correctly when sent to Teranode. The redesign also simplifies the pipeline: validation moves to intake, the `tx_validator` service is removed, the `PENDING_RETRY` status is removed (transient infra failures are kept in-memory and requeued through the dispatcher), the reaper is narrowed to rebroadcasting stale `SEEN_ON_NETWORK` rows that the dispatcher's in-flight set can't see, and the dispatcher holds Kafka offsets in flight until each tx terminalizes so a crash replays everything that wasn't done. |
| → Teranode | ||
| ``` | ||
|
|
||
| The `tx_validator` service is removed. Intake performs parse, script/fee validation, and dedup synchronously, then publishes directly to the propagation topic. |
| @@ -67,7 +73,10 @@ func (m *mockStore) UpdateStatus(_ context.Context, status *models.TransactionSt | |||
| return nil | |||
| } | |||
|
|
|||
| func (m *mockStore) GetOrInsertStatus(context.Context, *models.TransactionStatus) (*models.TransactionStatus, bool, error) { | |||
| func (m *mockStore) GetOrInsertStatus(_ context.Context, status *models.TransactionStatus) (*models.TransactionStatus, bool, error) { | |||
| if m.getOrInsertFn != nil { | |||
| return m.getOrInsertFn(status) | |||
| } | |||
| return nil, false, nil | |||
| } | |||
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
The test name and assertion described the soft-fail behavior of CheckPartitions, but CheckExactPartitions hard-fails on missing topics by design — the call site in app/app.go relies on it to abort startup, because Kafka auto-creating TopicPropagation on first publish would use the broker default partition count and silently break the dispatcher's single-partition invariant. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plan doc for discussion before implementation. Located at
docs/plans/dependency-aware-dispatch.md.What it covers
tx_validatorservice is removedPENDING_RETRYstatus and the reaper are removed; retry state lives in dispatcher memory, durable via Kafka replayDeployment shape
Single coordinated deploy with a new propagation topic. Operational prerequisite is draining the old queue before cutover. No backward-compat code path is retained — given the limited install base, the cost of a clean break is minimal.
Open for discussion
The plan is intentionally scoped to the dep-aware dispatch work plus the simplifications that come with it. Out of scope: binary message format (deferred until measurement justifies it), stale
SENT_TO_NETWORKhandling (separate concern, status is unused in today's code).