Skip to content

Plan: dependency-aware dispatch#151

Merged
galt-tr merged 61 commits into
mainfrom
plan/dependency-aware-dispatch
May 20, 2026
Merged

Plan: dependency-aware dispatch#151
galt-tr merged 61 commits into
mainfrom
plan/dependency-aware-dispatch

Conversation

@shruggr

@shruggr shruggr commented May 14, 2026

Copy link
Copy Markdown
Collaborator

Plan doc for discussion before implementation. Located at docs/plans/dependency-aware-dispatch.md.

What it covers

  • Tracking parent-child relationships between in-flight transactions so children are not broadcast until their parents have reached a terminal state, eliminating the cross-batch "missing inputs" race.
  • Pipeline simplifications that fall out of the change:
    • Validation moves into the intake handler; the tx_validator service is removed
    • Propagation topic becomes single-partition with a single-goroutine dispatcher owning the dep index and retry queue
    • PENDING_RETRY status and the reaper are removed; retry state lives in dispatcher memory, durable via Kafka replay
  • Retry classification driven by Teranode HTTP status codes (per the audit in fix(propagation): correct HTTP status mapping for missing-parent and duplicate-tx teranode#870), not by string-matching error bodies.

Deployment shape

Single coordinated deploy with a new propagation topic. Operational prerequisite is draining the old queue before cutover. No backward-compat code path is retained — given the limited install base, the cost of a clean break is minimal.

Open for discussion

The plan is intentionally scoped to the dep-aware dispatch work plus the simplifications that come with it. Out of scope: binary message format (deferred until measurement justifies it), stale SENT_TO_NETWORK handling (separate concern, status is unused in today's code).

Plan for tracking parent-child relationships between in-flight
transactions so children are not broadcast until their parents have
reached a terminal state. Also covers the pipeline simplifications
that fall out of the change: validation moves into intake, the
tx_validator service is removed, the propagation topic becomes
single-partition with a single-goroutine dispatcher, and the
PENDING_RETRY status / reaper are replaced with an in-memory retry
queue backed by Kafka replay for durability.

Intended for discussion before implementation. Single coordinated
deploy with a new propagation topic and a drain-the-old-queue
prerequisite; no backward-compat code path retained.
@github-actions github-actions Bot added chore Simple dependency updates or version bumps dependencies Dependency updates, version bumps, etc. size/M Medium change (51–200 lines) labels May 14, 2026
shruggr added 25 commits May 14, 2026 17:26
Introduces the single-goroutine dispatcher described in the plan. The
dispatcher reads transactions from an incoming channel, decides whether
to admit each to a pending broadcast batch or hold it as a waiter on
in-flight parents, and emits flushed batches to a downstream worker
channel. Status flips coming back (from broadcast workers and the
merkle-service callback path) drive waiter release, recursive
cascade-reject, retry queueing, and offset terminalization.

Components:

  - dispatcher.go: types (dispatcherMsg, inFlightEntry, statusFlip),
    the Dispatcher struct with its dep-index/retry-queue state, the
    main select loop, and handlers for incoming messages, status
    flips, and timer-driven batch flushes / retry wake-ups.

  - offset_tracker.go: a min-heap with lazy deletion behind the
    offsetTracker interface. Add records an in-flight offset, Done
    marks one terminalized, LowestUnfinished gives the commit point.

  - dispatcher_test.go: unit tests covering the baseline admit path,
    parent-in-flight hold, cascade-reject for REJECTED parents, the
    multi-parent case, retryable failure requeue with backoff, and
    retry exhaustion. Plus an OffsetTracker test for the lazy-delete
    semantics.

Not wired up to anything yet — the dispatcher stands alone. The next
commit will connect it to a Kafka consumer feeding incomingMsgs and a
broadcast-worker pool consuming outgoingBatch. Existing propagator,
tx_validator, and reaper code paths are untouched.

Retry classification uses HTTP status codes (422 for ErrTxMissingParent,
0 for no-response) rather than the existing IsRetryableError text match
— the latter is removed in a follow-up commit alongside the wiring
change. The dependency on Teranode returning 422 is satisfied by
bsv-blockchain/teranode#870.

Verified:
  - go test -count=1 -race ./services/propagation/... -- pass
  - go vet ./services/propagation/... -- clean
  - golangci-lint run ./services/propagation/ -- 0 issues
Builds the dedicated Kafka consumer for the dependency-aware
propagation topic. Distinct from the existing kafka.ConsumerGroup
wrapper (still used by webhook, watchdog, etc.) because it defers
Claim.MarkMessage until the dispatcher reports the message's offset
terminalized via its offsetTracker. Without that, a crash would leave
in-flight txs lost: Kafka would treat their offsets as committed while
the dispatcher's in-memory state was gone.

Architecture is two cooperating goroutines off Run:

  - Read loop: pulls messages from a Claim, decodes the JSON envelope
    (now including input_txids), records the message in a pending map
    keyed by offset, and pushes a dispatcherMsg onto the dispatcher's
    incomingMsgs channel. The send is blocking, so backpressure
    propagates naturally up through the claim buffer into the broker.

  - Commit loop: ticks every 200ms (configurable), queries
    offsetTracker.LowestUnfinished, and calls MarkMessage on every
    pending message with offset below that cursor. Drops committed
    entries from the pending map.

To make this safe, offsetTracker now guards its state with a mutex.
The dispatcher's writes (Add/Done) and the consumer's reads
(LowestUnfinished/Empty) cross goroutine boundaries; without the
mutex, the race detector flagged it correctly. Uncontested mutex
overhead is ~10-20ns per op — negligible against the dispatcher's
~5-10μs per-message JSON decode.

The consumer is not yet wired into the propagator. The next commit
will replace the existing parallel-consumer pool with this consumer
plus the dispatcher.

Verified:
  - go test -count=1 -race ./services/propagation/... -- pass
  - go vet ./services/propagation/... -- clean
  - golangci-lint run ./services/propagation/ -- 0 issues
Adds the broadcast side of the dep-aware pipeline. Consumes batches
from the dispatcher's outgoingBatch channel, submits them to Teranode,
writes terminal status rows to the store, and emits per-tx statusFlips
back to the dispatcher.

Design:

  - Multi-tx batches go to /txs first. On HTTP 2xx every tx in the
    batch gets ACCEPTED_BY_NETWORK — Teranode handles intra-batch
    dependency ordering internally so an HTTP 200 implies every tx was
    accepted into the mempool. /txs returns aggregate-only status, so
    on HTTP non-2xx the broadcaster falls back to per-tx /tx calls to
    recover per-tx status codes (some txs in a failed batch may
    succeed individually).

  - Single-tx batches go directly to /tx, skipping the batch path.

  - Fan-out across healthy endpoints is parallel; first 2xx wins and
    cancels siblings. No healthy endpoints maps to statusCode=0,
    which the dispatcher classifies as retryable.

  - Store writes use BatchUpdateStatus per tx. Write failures are
    logged but don't block the statusFlip emission — the dispatcher
    advances its in-memory state regardless, and Kafka replay
    reconciles divergence on restart.

  - Sibling cancellation does NOT count against endpoint health — a
    canceled-by-broadcast error is filtered before aggregating
    outcomes.

Tests cover the four meaningful paths: batch accepted (no fallback),
batch rejected with per-tx fallback, single-tx direct, and no healthy
endpoints (retryable signal).

Verified:
  - go test -count=1 -race ./services/propagation/... -- pass
  - go vet ./services/propagation/... -- clean
  - golangci-lint run ./services/propagation/ -- 0 issues
Restructures the submit handler for the dep-aware pipeline. The intake
HTTP handler now performs the work the tx_validator service used to do
asynchronously: parse, run policy validation, dedup via store CAS, and
publish to the dispatch topic with input_txids extracted from the
parsed transaction.

Changes:

  - kafka/topics.go: add TopicDispatch and TopicDispatchDLQ for the
    dep-aware pipeline's fresh single-partition topic.

  - services/api_server/server.go: add a validator.Validator field
    threaded through New(); existing tests using struct-literal
    construction get a nil-safe degraded mode (skips validation).

  - services/api_server/handlers.go: handleSubmitTransaction now runs
    ValidateTransaction (policy-only, matching tx_validator's current
    behavior), persists REJECTED rows synchronously on validation
    failure, performs the dedup CAS via GetOrInsertStatus, and
    publishes to TopicDispatch with an envelope including input_txids.

  - app/app.go: pass d.Validator to api_server.New so production
    deployments get full validation behavior.

  - handlers_test.go: extend mockStore with an overrideable
    getOrInsertStatusFn so tests can exercise the dedup path. Default
    behavior returns inserted=true so existing tests continue to see
    fresh-submission semantics.

  - intake_dispatch_test.go: new test file covering the four cases:
    happy-path publish to TopicDispatch with correct envelope shape,
    dedup hit returns idempotent 202 with existing state, transient
    store error continues to publish, and the pathological
    not-inserted-with-nil-existing case (defensive branch).

The legacy TopicTransaction publish is gone — nothing in production
will be producing to that topic anymore. The tx_validator service
remains as code but its consumer will see no traffic; removed in a
follow-up commit alongside the propagator entry-point swap.

Verified:
  - go test -count=1 -race ./services/api_server/... ./services/propagation/... -- pass
  - go vet ./... -- clean
  - golangci-lint run on touched packages -- 0 issues
Builds the propagation.Pipeline that wires Dispatcher,
dispatcherConsumer, and dispatcherBroadcaster together with their
shared channels and lifecycle. Implements services.Service so
app.BuildServices can drop it in alongside (or eventually replacing)
the legacy Propagator.

Architecture:

  - dispatcherConsumer reads from kafka.TopicDispatch, defers Kafka
    offset commit until the dispatcher's offsetTracker reports each
    tx terminalized.

  - Dispatcher runs in its own goroutine, owning the in-flight map,
    waiter index, retry queue, and offset heap. No locks because
    only this goroutine mutates them.

  - dispatcherBroadcaster reads emitted batches, submits to Teranode
    via the existing teranode.Client (batch endpoint with per-tx
    fallback on all-rejected), writes terminal status rows to the
    store, and emits per-tx statusFlips back to the dispatcher's
    return channel.

The dispatcher's rejectedSink is wired so cascade rejections from
REJECTED parents also produce a terminal store row — implemented as
a synthetic statusFlip onto the same flips channel the broadcaster
uses, keeping all terminal-write logic in one place
(dispatcherBroadcaster.emitOutcome). The synthetic flip falls back to
a non-blocking send so a full flips channel doesn't stall the
dispatcher loop; correctness is preserved by Kafka replay since the
parent's offset is not committed until the child terminalizes.

Lifecycle:

  - NewPipeline constructs all components and channels but does NOT
    start any goroutine.
  - Start spawns the broadcaster and dispatcher goroutines, then
    runs the consumer in the caller's goroutine (blocking until
    ctx is canceled or the subscription ends).
  - Stop cancels the internal context, which propagates to the two
    background goroutines, and waits for them to exit.

End-to-end tests cover:
  - Happy-path batch: two unrelated txs flow through, both end up
    ACCEPTED_BY_NETWORK in the store, one /txs hit on Teranode.
  - Dep-aware ordering: parent and child published in order, child
    held until parent's broadcast completes (verified by an
    artificially-slow parent and the order of /tx submissions
    observed by the test Teranode emulator).

Not yet wired into app.BuildServices — that swap is the next commit,
landing alongside removal of the legacy propagator entry point.
Merkle-service registration (the F-024 invariant for prior code) is
not yet plumbed in; needed before the cutover but separated from this
commit to keep the diff focused on the dispatcher wiring.

Verified:
  - go test -count=1 -race ./services/propagation/... ./services/api_server/... -- pass
  - go vet ./... -- clean
  - golangci-lint run on touched packages -- 0 issues
Before broadcasting a batch to Teranode, the broadcaster now
registers every tx with merkle-service for /watch tracking. Per-tx
registration failures emit a retryable statusFlip (statusCode=0)
so the dispatcher's retry queue re-dispatches them on backoff. Txs
that register successfully proceed to the existing /txs (with /tx
fallback) path.

This preserves the F-024 invariant from the legacy propagator: no
transaction reaches Teranode without a matching /watch entry, so
MINED / SEEN_ON_NETWORK / STUMP callbacks always have a row to
update.

Behavior matches the legacy registerBatch in two important ways:
  - When merkleClient is nil OR callbackURL is empty, registration
    is skipped entirely and every tx in the batch proceeds to
    broadcast. Same fallback the legacy path used.
  - Per-tx independence: a failing tx in a batch does NOT block the
    rest of the batch from broadcasting. The successful subset
    proceeds; failures take the retry path.

The dispatcher's retry queue handles the failed registrations the
same way it handles transient broadcast failures — same
isRetryableStatusCode logic, same exponential backoff via
computeRetryDeadline.

Pipeline wires the merkle deps through cfg.CallbackURL,
cfg.CallbackToken, and cfg.Propagation.MerkleConcurrency so
production behavior matches the legacy propagator.

Test coverage: a batch containing one tx whose merkle registration
fails (HTTP 500 from merkle-service) and one that succeeds. The
failing tx produces a retryable statusFlip and never reaches
Teranode; the successful tx broadcasts normally.

Verified:
  - go test -count=1 -race ./services/propagation/... -- pass
  - golangci-lint run ./services/propagation/ -- 0 issues
app.BuildServices now constructs propagation.NewPipeline for the
"propagation" service mode. The legacy propagation.New(...) call is
removed from the live wiring; the type and its surrounding code stay
in the source tree for a follow-up cleanup commit.

The Pipeline takes a tighter dependency set than the old Propagator:
no leaser (no reaper lease needed — single consumer per partition
makes the cluster-wide lock obsolete) and no publisher (status fan-out
flows through the existing Kafka publisher inside the store/event
layer, not through the propagator).

After this commit the dispatcher pipeline is the production
propagation path. Submissions flow:

  POST /tx → api-server intake (parse + validate + dedup CAS)
           → kafka.TopicDispatch (new single-partition topic)
           → propagation.Pipeline
              ├── dispatcherConsumer (deferred commit)
              ├── Dispatcher (dep index, retry queue)
              └── dispatcherBroadcaster (merkle register + Teranode submit)

The legacy tx_validator service still runs (consuming TopicTransaction)
because nothing has been removed yet — but no producer feeds it, so
it's idle. That removal lands next.

Verified:
  - go test -count=1 -race ./services/propagation/... ./services/api_server/... ./app/... -- pass
  - go vet ./... -- clean
  - golangci-lint run on touched packages -- 0 issues
The tx_validator service is dead code after the intake handler took
over its responsibilities (parse + policy validate + dedup CAS) and
intake started publishing directly to TopicDispatch instead of
TopicTransaction. Nothing produces to TopicTransaction anymore, so
the service would just sit idle.

Removed:
  - services/tx_validator/ directory (validator.go + validator_test.go)
  - app/app.go import and "tx-validator" service registration
  - config/config.go "tx-validator" mode entry

The TxValidator config block (cfg.TxValidator) is still defined in
config.go for backward read compatibility — operators with a
tx_validator: section in their config.yaml see it silently ignored
rather than getting a validation error. Removing the struct entirely
can happen in a separate cleanup once the deploy has rolled out.

Metrics with arcade_tx_validator_* names also stay defined for now;
they'll never be incremented in the new pipeline but keeping the
definitions prevents Prometheus scrape errors during the cutover.

Verified:
  - go test -count=1 -race ./services/... ./app/... ./config/... -- pass
  - go vet ./... -- clean
Two type renames that should have been extensions in the first place:

  - The dispatcher's incoming-message type is the existing
    propagationMsg, extended with InputTXIDs and KafkaOffset fields.
    Previously this was a parallel "dispatcherMsg" type — same JSON
    shape, same fields — which created unnecessary divergence.

  - Status-flip events between broadcaster/dispatcher use
    *models.TransactionStatus directly. The existing type already
    carries TxID, Status, StatusCode, and ExtraInfo, which is exactly
    what the per-tx outcome carries. The internal "statusFlip" struct
    was just a four-field subset of the same shape.

The broadcaster now constructs one *models.TransactionStatus per
outcome and uses it both as the store write payload AND as the
in-flight status flip sent to the dispatcher. Removes a redundant
local struct.

Reverts the gratuitous "msg" → "envelope" variable rename in
handleSubmitTransaction; the existing msg variable just gets one new
key in its map literal.

Net diff is smaller and matches the existing type vocabulary across
the codebase.

Verified:
  - go test -count=1 -race ./services/... ./app/... -- pass
  - golangci-lint run on touched packages -- 0 issues
The dispatcher pipeline is now the production path. Everything in the
legacy propagation service is dead code:

  - propagator.go: the parallel-consumer Propagator type and its
    flushBatch / processBatch / registerBatch / broadcast helpers.
    Replaced by Dispatcher + dispatcherConsumer +
    dispatcherBroadcaster.

  - reaper.go: the lease-coordinated PENDING_RETRY reaper.
    Replaced by the dispatcher's in-memory retry queue (Kafka replay
    provides crash durability).

  - retryable.go: IsRetryableError text-match classification.
    Replaced by isRetryableStatusCode using the HTTP status codes
    Teranode now returns (per the audit PR landed upstream).

  - backoff.go: ComputeBackoff helper for the reaper. The dispatcher
    has its own inlined retry-deadline math (computeRetryDeadline).

  - replay.go: the one-shot merkle-service replay on startup. Useful
    as a recovery mechanism for merkle-service state loss but not
    needed for the initial cutover; a follow-up can add it back if
    operationally warranted.

  - propagator_test.go: 2k lines of legacy-path tests, including the
    comprehensive mockStore with bookkeeping for PendingRetry,
    retryCount, merkleMarks, etc.

  - health_test.go: tests of the legacy propagator's broadcast
    behavior (first-success-wins across endpoints). Equivalent
    coverage now lives in dispatcher_broadcaster_test.go for the new
    broadcaster.

The propagationMsg type moved from propagator.go to dispatcher.go
where it's the natural neighbor of the dispatcher that consumes it.

A minimal mockStore lives in mocks_test.go — just enough to satisfy
the store.Store interface via embedding. Tests that need specific
store behavior compose over it (broadcastTestStore, pipelineTestStore).

Net: 4276 lines removed, ~25 lines added.

Verified:
  - go test -count=1 -race ./... -- all packages pass
  - golangci-lint run ./services/propagation/ -- 0 issues
The plan called for renaming the propagation topic (TopicDispatch) and
changing how it gets consumed (single-partition dispatcher pipeline)
— it did NOT call for renaming the service's external API. Reverts
the gratuitous Pipeline / NewPipeline names back to the original
Propagator / New.

Concretely:
  - type Pipeline → type Propagator
  - func NewPipeline → func New
  - pipeline.go → propagator.go (the file lives next to its peers
    again under the original name)
  - pipeline_test.go → propagator_test.go
  - Test names TestPipeline_* → TestPropagator_*
  - app.BuildServices reference propagation.New (no signature change
    visible to callers)

Internal types stay as-is: Dispatcher, dispatcherConsumer,
dispatcherBroadcaster — those are genuinely new components introduced
by the dep-aware pipeline; renaming them would just rename internal
implementation details without changing what they are.

Verified:
  - go test -count=1 -race ./services/propagation/... ./app/... -- pass
  - golangci-lint run ./services/propagation/ ./app/ -- 0 issues
Implements the dep-aware dispatch described in docs/plans/. Targeted
modifications to the existing Propagator and intake handler; no
architectural rework, no type churn.

Changes:

  - propagationMsg gains an InputTXIDs field. Optional on the JSON
    side so older producers continue to interoperate; the propagator
    treats absent/empty as "no in-flight parents".

  - Intake handler (api_server) now performs the work the
    tx_validator service used to do asynchronously: parse, run policy
    validation via validator.ValidateTransaction (skipFees=true,
    skipScripts=true — matches the legacy tx_validator behavior),
    dedup CAS via store.GetOrInsertStatus, extract InputTXIDs, and
    publish to kafka.TopicPropagation directly. Validation failure
    persists a terminal REJECTED row and returns 400 synchronously.

  - api_server.New now takes a *validator.Validator; app.BuildServices
    threads d.Validator through. Existing tests using struct-literal
    construction (validator nil) skip validation, preserving their
    behavior.

  - Propagator gains an in-flight dep index: inFlight set, waiters
    map (parent → set of children), pendingParents map (child → set
    of unmet parents), heldMsgs map (held child txid → message).
    handleMessage now checks input txids against inFlight and, if any
    parent is in flight, registers the message as a waiter instead
    of admitting it to pendingMsgs.

  - applyTerminalStatuses hooks into handleTerminalForDeps after the
    bulk publish: ACCEPTED parents release waiters whose pendingParents
    set becomes empty (re-entered into pendingMsgs so the next
    flushBatch picks them up); REJECTED parents cascade-reject every
    descendant recursively, writing terminal REJECTED rows and
    emitting a bulk publish for SSE/webhook subscribers. Terminalized
    txids are dropped from inFlight.

  - tx_validator gains an exported CollectInputTXIDs helper that
    intake reuses, keeping the wire format extraction identical
    regardless of producer. tx_validator itself is no longer
    registered in app.BuildServices — intake covers its job — but
    the package stays in the source tree as preserved queue-processing
    scaffolding (its message format includes input_txids too in case
    anyone re-activates it).

  - No new topic constant, no new types, no architectural split. The
    existing TopicPropagation carries the extended message format.
    Single-partition deployment is an operator choice, not a code
    constraint.

Verified:
  - go test -count=1 -race ./services/api_server/... ./services/propagation/...
    ./services/tx_validator/... ./app/... -- all pass
  - go vet ./... -- clean
  - golangci-lint run -- 0 issues
depaware_test.go covers the five core behaviors:

  - HoldsChildWhenParentInFlight: a tx whose InputTXIDs include a tx
    currently in flight lands in heldMsgs + pendingParents + waiters,
    not pendingMsgs.
  - ReleasesWaitersOnAccepted: parent's terminal ACCEPTED clears its
    waiters' pendingParents entry; a child whose set is now empty is
    re-entered into pendingMsgs.
  - CascadesRejectedChildren: parent's terminal REJECTED recursively
    rejects every descendant (child, grandchild). Each cascaded row
    is written to the store via BatchUpdateStatusReturning.
  - NoParents_AdmitsNormally: tx with no InputTXIDs follows the
    legacy admission path, no waiter state.
  - ParentNotInFlight_AdmitsChildDirectly: only IN-FLIGHT parents
    create waits — children of long-mined or never-seen parents go
    straight to pendingMsgs.

Behavior fix uncovered by the cascade test: a held waiter must still
appear in inFlight so DEEPER descendants (grandchild waiting on a
held child) register their waiter edge correctly. Handle this by
adding propMsg.TXID to inFlight BEFORE the hold/admit decision in
handleMessage. Removed the post-release inFlight insert in
handleTerminalForDeps since the child was already there.

persistCascadeRejections now uses BatchUpdateStatusReturning to match
applyTerminalStatuses's existing store contract (mockStore implements
that method, not BatchUpdateStatus).

Verified:
  - go test -count=1 -race ./services/propagation/... -- all pass
  - golangci-lint run ./services/propagation/ -- 0 issues
The previous version put inFlight / waiters / pendingParents /
heldMsgs on the Propagator struct behind the existing p.mu mutex.
That was the wrong shape: with a single Kafka consumer per partition
and the dispatch decisions all logically owned by one ordering
domain, the dep-index should live in a single goroutine with no
locks. The mutex was reading concurrent traffic from handleMessage
(consumer goroutine) and applyTerminalStatuses (processBatch
goroutines, up to MaxConcurrentBatches in parallel) — workable, but
extra synchronization complexity for state that has a natural single
owner.

This commit:

  - Creates services/propagation/dispatcher.go containing the
    dispatcher goroutine (runDispatcher) plus the admitRequest /
    terminalEvent / terminalResult protocol types. All dep-index
    state (inFlight, waiters, pendingParents, heldMsgs) is declared
    inside runDispatcher's function body — nothing else can reach
    them. The dispatcher serializes admit and terminal events via a
    select loop; no locks anywhere on dep-index state.

  - Removes inFlight / waiters / pendingParents / heldMsgs from the
    Propagator struct. Removes the holdAsWaiterLocked, releaseWaitersLocked,
    cascadeRejectLocked, handleTerminalForDeps helpers — their logic
    moved into the dispatcher goroutine.

  - handleMessage now calls p.admitToDispatcher(propMsg), which sends
    a request and waits for a synchronous admit/hold reply. Single
    goroutine round-trip per message; channels are buffered so a
    momentarily slow dispatcher doesn't immediately stall the consumer.

  - applyTerminalStatuses now calls p.notifyTerminalToDispatcher per
    terminalized txid, gets back the list of released waiters
    (re-entered into pendingMsgs by the caller) and cascaded
    descendants (caller writes REJECTED rows + bulk publish).

  - The dispatcher goroutine is started in New (not Start) so the
    existing tests that construct via New and call handleMessage
    directly continue to work. Stop cancels the dispatcher.

  - depaware_test.go updated to test via observable behavior
    (pendingMsgs contents, ms.updates) instead of poking at private
    dep-index maps, since those are no longer accessible from outside
    the dispatcher goroutine.

The legacy p.mu still protects pendingMsgs (a slice the consumer
goroutine and the dispatcher both need to append to — released
waiters re-enter via the consumer-side helper). That mutex was
already there before this PR and isn't part of the dep-aware change.

Verified:
  - go test -count=1 -race -timeout 60s ./services/... ./app/... -- all pass
  - golangci-lint run on touched packages -- 0 issues
@galt-tr galt-tr marked this pull request as ready for review May 18, 2026 14:18
@galt-tr galt-tr requested a review from mrz1836 as a code owner May 18, 2026 14:18
@galt-tr galt-tr requested a review from Copilot May 18, 2026 14:45

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a dependency-aware propagation dispatcher and updates the pipeline to publish directly from intake to the propagation topic. It also adds Teranode /txs failure-list parsing to support per-tx classification, removes the tx_validator service, and updates tests/metrics/docs accordingly.

Changes:

  • Move structural/policy validation + store dedup into the API intake handler and publish directly to kafka.TopicPropagation (with input_txids attached).
  • Rewrite propagation to use a single-goroutine, dep-aware dispatcher with deferred Kafka offset marking, plus updated broadcast/requeue behavior.
  • Extend Teranode client batch submit to optionally return a per-tx failure map parsed from Teranode’s structured 500 body; remove legacy retryable-string matching and tx-validator-related metrics/tests.

Reviewed changes

Copilot reviewed 34 out of 34 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
tests/e2e/smoke_test.go Updates e2e comments to reflect validation moving into intake.
tests/e2e/smoke_real_test.go Updates e2e comments to reflect intake validation.
tests/e2e/harness/arcade.go Updates readiness comment to remove tx-validator reference.
teranode/client.go Changes /txs API to return (status, failures, err) and adds Teranode failure-list parsing.
teranode/client_test.go Adds tests for failure-list parsing behavior and updates call sites.
store/postgres/postgres.go Removes tx_validator-specific commentary from batch insert docs.
store/postgres/batch_test.go Updates test comments to remove tx_validator references.
store/pebble/batch_test.go Updates test comments to remove tx_validator references.
store/batch_test.go Updates test comments to remove tx_validator references.
services/tx_validator/validator.go Deletes tx-validator service implementation.
services/tx_validator/validator_test.go Deletes tx-validator unit tests.
services/sse/sse_e2e_test.go Updates e2e helper comment to reflect intake behavior.
services/propagation/retryable.go Removes string-matching retryability helper.
services/propagation/retryable_test.go Removes retryability tests tied to string matching.
services/propagation/reaper.go Reworks reaper to scan for stale SEEN_ON_NETWORK rows and rebroadcast them.
services/propagation/propagator.go Major rewrite: dep-aware dispatcher channels, new Teranode classification, requeue behavior, lifecycle changes.
services/propagation/propagator_test.go Removes obsolete tests tied to PENDING_RETRY/backoff and updates broadcast expectations.
services/propagation/offset_tracker.go Adds heap-based offset tracker for deferred commit watermarking.
services/propagation/health_test.go Removes inline-retry delay tweaks (inline retry removed).
services/propagation/dispatcher.go Adds single-goroutine dependency-aware dispatcher and channel protocols.
services/propagation/depaware_test.go Adds unit tests for dependency gating, release/cascade, and maxPending backpressure.
services/propagation/backoff.go Removes retry backoff helper (no longer used).
services/propagation/backoff_test.go Removes backoff tests.
services/api_server/server.go Adds validator dependency and threads it through constructor.
services/api_server/handlers.go Adds input parent extraction, intake validation/dedup, publishes directly to propagation topic, adds batch submit changes.
services/api_server/handlers_test.go Updates batch submit tests for 202 responses.
metrics/metrics.go Removes tx-validator metrics; trims propagation metric labels now that fallback/retry changed.
metrics/metrics_test.go Updates scrape test to stop referencing tx-validator metrics.
kafka/consumer.go Adds ClaimHandler mode to bypass drain/flush/retry/DLQ and let a service own the claim goroutine.
docs/plans/dependency-aware-dispatch.md Adds plan doc describing dependency-aware dispatch + pipeline simplifications.
deploy/tx-validator.yaml Removes tx-validator deployment manifest.
config/config.go Removes tx_validator config/mode; updates propagation config docs.
config.example.yaml Removes tx_validator example config and updates mode comment.
app/app.go Removes tx-validator service wiring; updates api-server wiring to pass validator.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread services/api_server/handlers.go Outdated
Comment thread teranode/client.go
Comment on lines +780 to +790
failures := make(map[string]string, len(lines)-1)
for _, line := range lines[1:] {
if line == "" {
continue
}
txid := txsTxidPattern.FindString(line)
if txid == "" {
continue
}
failures[strings.ToLower(txid)] = line
}
Comment on lines +249 to +262
// Start a dispatcher goroutine with a nil claim so tests that
// construct via New and drive via admitCh / drainCh have a running
// state machine without needing to invoke Start. In production
// Start replaces this with the same loop running inside the kafka
// ClaimHandler — see Start. The two paths can't both be live at
// once: Start cancels this context before subscribing.
dispatcherCtx, dispatcherCancel := context.WithCancel(context.Background())
p.dispatcherCancel = dispatcherCancel
go func() {
if err := p.runDispatcher(dispatcherCtx, nil, dispatcherConfig{maxPending: maxPending}); err != nil {
p.logger.Error("test-mode dispatcher exited with error", zap.Error(err))
}
}()
return p
Comment on lines +297 to +321
// Record the offset on the tracker for either branch — both held
// and admitted txs are in-flight from the consumer's perspective
// and must pin the commit watermark below this offset.
inFlight[msg.TXID] = offset
tracker.Add(offset)

if len(blocking) > 0 {
// Hold as a waiter. Held txs DO go into inFlight (so
// descendants can register on them) but NOT into pendingMsgs
// (they're not on the broadcast path yet).
for parent := range blocking {
set, ok := waiters[parent]
if !ok {
set = make(map[string]struct{})
waiters[parent] = set
}
set[msg.TXID] = struct{}{}
}
heldMsgs[msg.TXID] = msg
return admitResult{held: true}
}

// Eligible for broadcast. Add to pendingMsgs.
*pendingMsgs = append(*pendingMsgs, msg)
return admitResult{admitted: true}
Comment thread services/propagation/depaware_test.go Outdated
Comment on lines +1 to +4
# Dependency-Aware Dispatch

Plan to add parent-child dependency awareness to Arcade's broadcast pipeline so that transactions in the queue with parent-child relationships are sequenced correctly when sent to Teranode. The redesign also simplifies the pipeline: validation moves to intake, the `tx_validator` service is removed, PENDING_RETRY status and the reaper are removed, and the dispatcher holds Kafka offsets in flight until each tx terminalizes so a crash replays everything that wasn't done.

Comment thread docs/plans/dependency-aware-dispatch.md Outdated
Comment on lines +82 to +90
### Retry handling

PENDING_RETRY status and the reaper go away. Infrastructure failures retry forever; the mechanism differs by failure mode because merkle-service is binary while Teranode is per-tx.

**Merkle-service `/watch` failure — inline retry, whole batch.** `registerBatch` sleeps with capped-exponential backoff and retries the whole batch. The merkle-service `/watch` payload's only per-tx variation is the txid string — a failure is always all-or-nothing, so splitting buys nothing. The `processBatch` goroutine holds a `processBatchSem` slot during the retry loop, which is the natural backpressure we want: once all slots are held, the consumer stops pulling new work.

Backoff: **100ms → 500ms → 2s → 5s → 10s, then 10s steady, retrying forever**.

**Teranode `/txs` per-slot infra failures — requeue individually, short flat wait.** With #879 + #881, the `/txs` response delivers per-slot Teranode codes. `broadcastInChunks` walks the per-slot lines and partitions them:
@galt-tr

galt-tr commented May 18, 2026

Copy link
Copy Markdown
Contributor

Looking good; lets address copilot feedback

@mrz1836 mrz1836 assigned galt-tr and unassigned mrz1836 May 18, 2026
galt-tr and others added 3 commits May 18, 2026 12:23
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 34 out of 34 changed files in this pull request and generated 9 comments.

Comments suppressed due to low confidence (1)

kafka/consumer.go:83

  • NewConsumerGroup doesn’t validate that either (a) ClaimHandler is set or (b) Handler is set. In non-ClaimHandler mode, a nil Handler will panic when processing messages. Consider adding a config validation error early (e.g., require ClaimHandler != nil || Handler != nil) to fail fast on misconfiguration.
func NewConsumerGroup(cfg ConsumerConfig) (*ConsumerGroup, error) {
	if cfg.Broker == nil {
		return nil, fmt.Errorf("ConsumerConfig.Broker is required")
	}
	sub, err := cfg.Broker.Subscribe(cfg.GroupID, cfg.Topics)
	if err != nil {
		return nil, fmt.Errorf("subscribing: %w", err)
	}

Comment on lines +1 to +4
# Dependency-Aware Dispatch

Plan to add parent-child dependency awareness to Arcade's broadcast pipeline so that transactions in the queue with parent-child relationships are sequenced correctly when sent to Teranode. The redesign also simplifies the pipeline: validation moves to intake, the `tx_validator` service is removed, the `PENDING_RETRY` status is removed (transient infra failures are kept in-memory and requeued through the dispatcher), the reaper is narrowed to rebroadcasting stale `SEEN_ON_NETWORK` rows that the dispatcher's in-flight set can't see, and the dispatcher holds Kafka offsets in flight until each tx terminalizes so a crash replays everything that wasn't done.

Comment thread config/config.go Outdated
Comment on lines 167 to 171
// MinPartitions is the minimum number of partitions every hot-path topic
// must have at startup. Set to the expected replica count of the largest
// horizontally-scaled consumer (tx-validator or propagation) so arcade
// fails fast when the cluster can't actually fan out across pods. Leave
// at 0 or 1 in standalone/single-replica deployments.
// must have at startup. Set to the expected replica count of the propagation
// consumer so arcade fails fast when the cluster can't actually fan out
// across pods. Leave at 0 or 1 in standalone/single-replica deployments.
MinPartitions int `mapstructure:"min_partitions"`
Comment thread metrics/metrics.go Outdated
Comment on lines 22 to 23
// pending-retry depth, reaper lease and tick outcomes, inline retries,
// merkle registration latency.
Comment thread services/propagation/reaper.go Outdated
Comment on lines +133 to +138
if len(stuck) == 0 {
return
}

p.logger.Info("reaper: rebroadcasting pending retries", zap.Int("count", len(ready)))
p.logger.Info("reaper: rebroadcasting stuck txs", zap.Int("count", len(stuck)))
metrics.PropagationReaperReadyDepth.Set(float64(len(stuck)))
// naturally pauses Kafka pulls and lets backpressure flow back to
// the broker. No DLQ, no error to the client; the only observable
// effect is briefly increased consumer lag.
_ = p.admitToDispatcher(propMsg, msg.Offset)
Comment on lines +613 to +639
// Dedup CAS via GetOrInsertStatus. Two submitters racing on the
// same txid both attempt the insert; the loser sees inserted=false
// and returns 202 idempotently without re-publishing.
if s.store != nil {
row := &models.TransactionStatus{
TxID: txid,
Status: models.StatusReceived,
Timestamp: time.Now(),
// Carry the raw bytes on the status row so the propagation
// reaper can rebroadcast txs that are stuck in non-terminal
// states without re-fetching from Kafka or the API caller.
RawTx: rawTx,
}
existing, inserted, dedupErr := s.store.GetOrInsertStatus(c.Request.Context(), row)
switch {
case dedupErr != nil:
s.logger.Error("dedup CAS failed", zap.String("txid", txid), zap.Error(dedupErr))
// Best-effort: continue with publish. The propagator's
// in-flight set catches duplicates that slip past.
case !inserted && existing != nil:
s.recordSubmission(c.Request.Context(), txid, opts)
c.JSON(http.StatusAccepted, gin.H{
"status": "already submitted",
"txid": txid,
"state": string(existing.Status),
})
return
Comment on lines +804 to +839
if s.store != nil {
ctx := c.Request.Context()
for _, p := range parsed {
row := &models.TransactionStatus{
TxID: p.txid,
Status: models.StatusReceived,
Timestamp: time.Now(),
// Carry the raw bytes on the row so the propagation
// reaper can rebroadcast stuck txs without re-fetching.
RawTx: p.raw,
}
existing, inserted, dedupErr := s.store.GetOrInsertStatus(ctx, row)
switch {
case dedupErr != nil:
s.logger.Error("dedup CAS failed", zap.String("txid", p.txid), zap.Error(dedupErr))
toPublish = append(toPublish, p)
case !inserted && existing != nil:
duplicates++
s.recordSubmission(ctx, p.txid, opts)
default:
toPublish = append(toPublish, p)
}
}
} else {
toPublish = parsed
}

// Register every accepted tx with the in-process TxTracker so the
// bump-builder's filterTrackedTxids recognizes them when their block
// is processed. Without this, tracked-only fan-out drops every MINED
// transition and txs stay stuck at SEEN_ON_NETWORK forever.
if s.txTracker != nil {
for _, p := range toPublish {
s.txTracker.Add(p.txid, models.StatusReceived)
}
}
Comment on lines +916 to +939
// requeueAfterDelay schedules a delayed requeue of msgs through the
// dispatcher. Spawns a goroutine that sleeps requeueDelay then sends
// each msg to requeueCh. The goroutine bails on ctx cancellation so
// claim revocation and shutdown don't hold txs in limbo.
func (p *Propagator) requeueAfterDelay(ctx context.Context, msgs []propagationMsg) {
if len(msgs) == 0 {
return
}
go func(msgs []propagationMsg) {
select {
case <-time.After(requeueDelay):
case <-ctx.Done():
return
}
for _, m := range msgs {
select {
case <-ctx.Done():
return
default:
}
p.requeueToDispatcher(m)
}
}(msgs)
}
Comment on lines +21 to +61
type offsetTracker struct {
heap *offsetMinHeap
done map[int64]struct{}
}

// newOffsetTracker constructs an empty offsetTracker.
func newOffsetTracker() *offsetTracker {
return &offsetTracker{
heap: &offsetMinHeap{},
done: make(map[int64]struct{}),
}
}

// Add records an offset as in-flight. The dispatcher calls Add when it
// admits a Kafka message into pendingMsgs or heldMsgs.
func (t *offsetTracker) Add(offset int64) {
heap.Push(t.heap, offset)
}

// Done marks an in-flight offset as terminalized. Idempotent: marking the
// same offset twice is a no-op the second time. The dispatcher calls Done
// after a tx reaches a terminal status (ACCEPTED_BY_NETWORK, REJECTED) or
// is cascade-rejected as a child of a rejected parent.
func (t *offsetTracker) Done(offset int64) {
t.done[offset] = struct{}{}
}

// LowestUnfinished returns the smallest offset still in-flight, or (0,
// false) when every offset has been marked done (or none have been
// added). The Kafka commit watermark must not advance past this value:
// if it did, a crash would leave the in-flight tx unreplayed.
//
// Cleans done entries off the heap top as a side effect so the heap top
// is always an unfinished offset after a successful call returns.
func (t *offsetTracker) LowestUnfinished() (int64, bool) {
t.cleanTop()
if t.heap.Len() == 0 {
return 0, false
}
return (*t.heap)[0], true
}
shruggr and others added 3 commits May 18, 2026 18:57
Any non-empty failure line without an extractable txid now returns
nil from parseTxsFailures, dropping to the caller's whole-batch
requeue path. The only Teranode path that produces such a line is the
processOne panic recover (Server.go:740-744), which doesn't include
tx.TxID() in the wrapper. Re-broadcasting the whole batch is safe
(Teranode dedups on store-level) and avoids silently marking the
orphan's owner as ACCEPTED.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 39 out of 39 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

app/app.go:99

  • The MinPartitions startup guard is effectively disabled here because CheckPartitions is called with a nil topic list, so it never checks anything even when kafka.min_partitions > 1. Either pass the intended topic set(s) to enforce the setting (e.g. the horizontally-scaled topics) or remove/disable the knob entirely to avoid a misleading configuration that appears enabled but has no effect.
	// MinPartitions is a soft hint for horizontally-scaled topics. There
	// are currently no other hot-path topics that need it (TopicTransaction
	// was retired with tx_validator) but the knob is retained so a future
	// fan-out topic can opt into the check without re-introducing config.
	if cfg.Kafka.MinPartitions > 1 {
		if pErr := kafka.CheckPartitions(broker, nil, cfg.Kafka.MinPartitions, logger); pErr != nil {
			_ = producer.Close()
			return nil, nil, fmt.Errorf("kafka partition check: %w", pErr)
		}
	}

Comment thread docs/plans/dependency-aware-dispatch.md
Comment thread services/api_server/handlers.go
Comment thread services/sse/sse_e2e_test.go Outdated
galt-tr and others added 2 commits May 19, 2026 13:25
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 39 out of 39 changed files in this pull request and generated 4 comments.

Comment on lines +1 to +3
# Dependency-Aware Dispatch

Plan to add parent-child dependency awareness to Arcade's broadcast pipeline so that transactions in the queue with parent-child relationships are sequenced correctly when sent to Teranode. The redesign also simplifies the pipeline: validation moves to intake, the `tx_validator` service is removed, the `PENDING_RETRY` status is removed (transient infra failures are kept in-memory and requeued through the dispatcher), the reaper is narrowed to rebroadcasting stale `SEEN_ON_NETWORK` rows that the dispatcher's in-flight set can't see, and the dispatcher holds Kafka offsets in flight until each tx terminalizes so a crash replays everything that wasn't done.
→ Teranode
```

The `tx_validator` service is removed. Intake performs parse, script/fee validation, and dedup synchronously, then publishes directly to the propagation topic.
Comment thread kafka/partitions.go Outdated
Comment on lines 61 to 81
@@ -67,7 +73,10 @@ func (m *mockStore) UpdateStatus(_ context.Context, status *models.TransactionSt
return nil
}

func (m *mockStore) GetOrInsertStatus(context.Context, *models.TransactionStatus) (*models.TransactionStatus, bool, error) {
func (m *mockStore) GetOrInsertStatus(_ context.Context, status *models.TransactionStatus) (*models.TransactionStatus, bool, error) {
if m.getOrInsertFn != nil {
return m.getOrInsertFn(status)
}
return nil, false, nil
}
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
The test name and assertion described the soft-fail behavior of
CheckPartitions, but CheckExactPartitions hard-fails on missing topics
by design — the call site in app/app.go relies on it to abort startup,
because Kafka auto-creating TopicPropagation on first publish would use
the broker default partition count and silently break the dispatcher's
single-partition invariant.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@galt-tr galt-tr merged commit 4788e9b into main May 20, 2026
49 checks passed
@galt-tr galt-tr deleted the plan/dependency-aware-dispatch branch May 20, 2026 00:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

chore Simple dependency updates or version bumps dependencies Dependency updates, version bumps, etc. size/M Medium change (51–200 lines)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants