feat(lifecycle): inbound turn tracking with orphan recovery and abort coordination#30011
feat(lifecycle): inbound turn tracking with orphan recovery and abort coordination#30011nohat wants to merge 16 commits intoopenclaw:mainfrom
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 40531c0baa
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
src/auto-reply/dispatch.ts
Outdated
| const actualSends = dispatcher.getDeliveryStats?.()?.successfulSends ?? 0; | ||
| if (actualSends > 0) { | ||
| finalizeTurn(turnId, "delivered"); | ||
| } else { | ||
| recordTurnRecoveryFailure(turnId, "final send queued but no successful sends"); |
There was a problem hiding this comment.
Base fail-open decision on outbox, not dispatcher send counters
This branch assumes queuedFinal implies a dispatcher-managed send, but ACP routed turns set queuedFinal=true via routeReply and do not increment dispatcher successfulSends; when outbox linkage is missing for that path, getOutboxStatusForTurn is empty and this code marks an actually delivered final reply as a recovery failure, causing unnecessary turn retries and duplicate user-visible messages on the next recovery pass.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed — the attemptedFinal && queuedFinal fallback path now re-checks the outbox as the authoritative source before falling back to the in-memory dispatcher counter. This correctly handles ACP-routed turns where the delivery path bypasses the dispatcher's send counter.
| const recoverable = listRecoverableTurns({ stateDir: params.stateDir }).slice( | ||
| 0, | ||
| maxTurnsPerPass, | ||
| ); |
There was a problem hiding this comment.
Apply turn-pass limit after filtering non-actionable turns
The pass limit is enforced before filtering out active turns and turns that still have queued outbox work, so a fixed set of oldest non-actionable turns can consume the entire batch each tick. In that state, later recoverable failed_retryable/orphan turns are never examined, which can stall recovery indefinitely under load.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed — the turn worker now filters non-actionable turns (active in-process, outbox-pending, or finalized by outbox state) in a first pass, collects replay-eligible turns, and then applies maxTurnsPerPass. This ensures the batch limit reflects actual work items, not skipped entries.
40531c0 to
6a25d47
Compare
Greptile SummaryAdds comprehensive inbound turn tracking with SQLite-backed persistence, orphan recovery, and abort coordination. Core implementation is solid with proper lifecycle management ( Key changes:
Successfully addresses all Codex review concerns (#29149): active-turn registry prevents premature finalization, turn pass limit applied correctly, resumed turns bypass inbound dedupe, and fail-open gated on delivery stats. Issue found: Confidence Score: 4/5
Last reviewed commit: 6a25d47 |
| try { | ||
| db.prepare( | ||
| `UPDATE message_turns | ||
| SET status='aborted', | ||
| updated_at=?, | ||
| completed_at=? | ||
| WHERE session_key=? | ||
| AND status IN ('accepted','running','delivery_pending','failed_retryable')`, | ||
| ).run(now, now, sessionKey.trim()); | ||
| // Cancel any pending outbox entries linked to the now-aborted turns so | ||
| // recoverPendingDeliveries doesn't replay sends for aborted turns. | ||
| db.prepare( | ||
| `UPDATE message_outbox | ||
| SET status='failed_terminal', | ||
| error_class='terminal', | ||
| terminal_reason='turn_aborted', | ||
| completed_at=? | ||
| WHERE turn_id IN ( | ||
| SELECT id FROM message_turns WHERE session_key=? AND status='aborted' | ||
| ) | ||
| AND status IN ('queued','failed_retryable')`, | ||
| ).run(now, sessionKey.trim()); | ||
| } catch (err) { |
There was a problem hiding this comment.
Two separate UPDATE statements without transaction wrapper - if crash occurs between the updates, turns are marked aborted but outbox entries remain queued, causing messages to be delivered after abort. Wrap both db.prepare().run() calls in runLifecycleTransaction(db, () => { ... })
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/infra/message-lifecycle/turns.ts
Line: 554-576
Comment:
Two separate UPDATE statements without transaction wrapper - if crash occurs between the updates, turns are marked aborted but outbox entries remain queued, causing messages to be delivered after abort. Wrap both db.prepare().run() calls in `runLifecycleTransaction(db, () => { ... })`
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Fixed — abortTurnsForSession now wraps both UPDATEs (turns + outbox) in runLifecycleTransaction(). This prevents a window where turns are aborted but their outbox rows are still queued and could be replayed by the recovery worker.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6a25d47af1
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if (outbox.queued > 0) { | ||
| continue; |
There was a problem hiding this comment.
Ignore non-final queued rows when gating turn replay
This guard skips replay whenever any outbox row is still queued, but outbox recovery only reloads rows with dispatch_kind = 'final' (or null) in loadPendingDeliveries (src/infra/outbound/delivery-queue.ts). If the process crashes after enqueueing a tool/block row, that row is never retried, so outbox.queued stays non-zero and the turn is never replayed (until expiry), which can drop the user-visible final response for that turn.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
The dispatch_kind SQL filter in loadPendingDeliveries already excludes non-final rows from recovery queries (dispatch_kind IS NULL OR dispatch_kind = 'final'). The outbox .queued check in the turn pass uses the same filtered view. Additionally, the write-ahead-recovery fix now terminates orphaned non-final rows at the start of each recovery pass, ensuring they're cleaned up.
Replace unbounded file-based delivery queue with queryable SQLite message_outbox table. Adds TTL/expiry for stale entries, delivery outcome retention, and one-time legacy file queue import on startup. Closes openclaw#23777, openclaw#16555, openclaw#29128
6a25d47 to
9f33bcb
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9f33bcb831
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| } else if (status.failed > 0 && status.queued === 0) { | ||
| finalizeTurn(turnId, "failed"); |
There was a problem hiding this comment.
Base turn finalization on final-dispatch outbox rows
This branch finalizes a turn as failed whenever status.failed > 0, but getOutboxStatusForTurn counts all outbox rows for the turn (including tool/block dispatches) rather than only final replies. In a turn where an intermediate tool/block send fails but the final reply is delivered, this marks the turn as failed and can trigger unnecessary replay of an already-completed turn (duplicate user-visible final responses on recovery).
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed — getOutboxStatusForTurn now accepts { finalOnly: true } which filters to dispatch_kind IS NULL OR dispatch_kind = 'final'. Both callers in dispatch.ts and the turn pass in server-message-lifecycle.ts now use this option. Tool/block sends no longer affect turn finalization decisions.
| AND (dispatch_kind IS NULL OR dispatch_kind = 'final') | ||
| AND next_attempt_at <= ? |
There was a problem hiding this comment.
Exclude NULL dispatch kinds from recovery replay set
Recovery currently replays rows where dispatch_kind IS NULL as if they were final deliveries. Because the enqueue path in deliverOutboundPayloads does not persist a dispatch kind, routed non-final payloads (for example tool/block messages emitted via routeReply) are also stored as NULL, so they are retried after restart even though non-final traffic should be replayed via turn recovery, leading to stale intermediate messages being resent out of order.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
This is already correct. loadPendingDeliveries filters (dispatch_kind IS NULL OR dispatch_kind = 'final') — NULL entries are from the deliverOutboundPayloads direct path and ARE final deliveries (they never set dispatchKind). Non-final orphans are terminalized by recoverPendingDeliveries (lines 592-608) which marks them failed_terminal with reason "non-final dispatch pruned on recovery" before loading pending entries.
9f33bcb to
4269852
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 4269852eee
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if (kind === "final") { | ||
| attemptedFinal += 1; | ||
| } |
There was a problem hiding this comment.
Link ACP final deliveries to their message turn
deliverAcpPayload now increments attemptedFinal for ACP final replies, but the routed send path still calls routeReply without turnId, so successful ACP sends are not associated with the turn in message_outbox. In dispatchInboundMessageInternal, this leaves outbox status empty and falls back to dispatcher successfulSends (which stays 0 for routed sends), causing recordTurnRecoveryFailure on already-delivered turns and triggering duplicate replay attempts; fresh evidence is that this ACP routeReply call omits turnId while other updated routing call sites pass it.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Valid — fixed by passing turnId: params.ctx.MessageTurnId to routeReply() in dispatch-acp.ts. ACP-routed final deliveries now correctly create outbox rows linked to their turn, visible to getOutboxStatusForTurn.
Fix landed in b6b842c on lifecycle/turn-tracking.
There was a problem hiding this comment.
Pull request overview
This PR introduces a durable “message lifecycle” layer to make inbound turn processing and outbound delivery restart-safe by persisting turn/outbox state in SQLite and running background recovery/pruning workers in the gateway.
Changes:
- Added SQLite-backed lifecycle DB (
message_turns,message_outbox) plus a new inbound turn tracking module with recovery + abort coordination. - Refactored outbound delivery queue to use the SQLite outbox (retry/expiry/prune), and linked outbound deliveries to turns (
turnId) for coordinated finalization. - Integrated lifecycle into the dispatch pipeline (turn acceptance/registry, dedupe bypass for resumed turns, finalization gated by outbox + delivery stats) and added a gateway worker loop for recovery/pruning.
Reviewed changes
Copilot reviewed 21 out of 21 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/infra/message-lifecycle/db.ts | Adds lifecycle SQLite DB open/cache + schema for turns/outbox. |
| src/infra/message-lifecycle/turns.ts | Implements turn acceptance, status transitions, recovery listing, abort coordination, pruning. |
| src/infra/message-lifecycle/index.ts | Re-exports lifecycle modules. |
| src/infra/outbound/delivery-queue.ts | Migrates queue to SQLite outbox, adds recovery filtering via startupCutoff, turn finalization hooks, prune API. |
| src/infra/outbound/deliver.ts | Threads turnId into write-ahead outbox enqueue and tightens best-effort failure bookkeeping. |
| src/gateway/server-message-lifecycle.ts | New lifecycle workers: outbox recovery/prune + orphan turn recovery/prune. |
| src/gateway/server.impl.ts | Starts/stops lifecycle workers as part of gateway lifecycle. |
| src/auto-reply/dispatch.ts | Adds turn acceptance/active registry, outbox-linked finalization, resumed-turn dispatch entrypoint. |
| src/auto-reply/reply/reply-dispatcher.ts | Adds optional delivery queue context + delivery stats; enqueues/acks/fails per-dispatch kind. |
| src/auto-reply/reply/dispatch-from-config.ts | Adds skipInboundDedupe for resumed turns; tracks attemptedFinal; threads turnId to routing. |
| src/auto-reply/reply/dispatch-acp.ts | Tracks attemptedFinal; threads turnId through ACP routing. |
| src/auto-reply/reply/route-reply.ts | Adds optional turnId to link routed replies to outbox/turn. |
| src/auto-reply/reply/abort.ts | Persists abort by marking turns aborted and canceling queued outbox rows. |
| src/auto-reply/templating.ts | Adds MessageTurnId to message context. |
| src/channels/plugins/types.adapters.ts | Adds outbound delivery metadata + idempotency capability flag to adapter types. |
| src/config/zod-schema.session.ts | Adds messages.delivery config schema (expiry settings). |
| src/config/types.messages.ts | Adds MessagesDeliveryConfig types. |
| src/config/schema.labels.ts | Adds UI labels for delivery lifecycle config. |
| src/config/schema.help.ts | Adds help text for delivery lifecycle config. |
| src/infra/outbound/outbound.test.ts | Updates tests to validate SQLite outbox behavior and startupCutoff filtering. |
| src/telegram/bot.create-telegram-bot.test-harness.ts | Mocks lifecycle/outbox modules to avoid cross-test interference from persistent state. |
| // Keep durable turn tracking enabled while deferring persistent dedupe to the | ||
| // existing inbound dedupe path until per-channel message identity semantics | ||
| // are fully normalized (for example callback/query ids vs message ids). | ||
| const disablePersistentDedupe = true; | ||
| const dedupeKey = disablePersistentDedupe ? undefined : (buildInboundDedupeKey(ctx) ?? undefined); |
There was a problem hiding this comment.
The PR description mentions persistent inbound dedupe, but acceptTurn currently hard-disables persistent dedupe via disablePersistentDedupe = true, which makes dedupeKey undefined and bypasses the INSERT OR IGNORE dedupe path entirely. If persistent dedupe is intended in this PR, this should be configurable (or enabled by default) rather than hard-coded off; otherwise, the PR description and related schema/indexing (unique index on dedupe_key) should be updated to reflect that dedupe is not actually active yet.
| `INSERT INTO message_turns | ||
| (id, channel, account_id, external_id, dedupe_key, session_key, payload, | ||
| route_channel, route_to, route_account_id, route_thread_id, route_reply_to_id, | ||
| accepted_at, updated_at, status, attempt_count, next_attempt_at) | ||
| VALUES (?, ?, ?, NULL, NULL, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'running', 0, ?)`, |
There was a problem hiding this comment.
acceptTurn inserts new turns with status='running', but the lifecycle model (and the rest of this module) defines an accepted → running transition. As written, the accepted state becomes unreachable in normal operation and markTurnRunning() (which only transitions from accepted/failed_retryable) is effectively a no-op for newly created turns. Consider inserting with status='accepted' (or updating markTurnRunning/state machine) so the persisted lifecycle matches the intended transitions and recovery queries.
| // Without aging, entries are not eligible because the outbox worker defers fresh | ||
| // entries for 5 s to avoid racing with the direct delivery path. |
There was a problem hiding this comment.
This comment says entries are deferred for 5s, but the current recovery eligibility logic is primarily based on startupCutoff (skip entries enqueued after startup with attempt_count=0 / last_attempt_at IS NULL) and next_attempt_at, not a fixed 5-second window. Updating this test comment to match the actual behavior would prevent future confusion when adjusting recovery timing/filtering.
| // Without aging, entries are not eligible because the outbox worker defers fresh | |
| // entries for 5 s to avoid racing with the direct delivery path. | |
| // Without aging, entries enqueued after startup with attempt_count=0 / last_attempt_at IS NULL | |
| // are treated as live deliveries and are skipped by crash recovery based on startupCutoff/next_attempt_at. |
Every inbound message creates a durable turn record in message_turns. Turn worker detects orphaned turns (accepted but never completed after crash) and recovers them. Abort commands mark turns as aborted, preventing re-delivery. Outbox entries are linked to turns for coordinated finalization. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…d premature finalization (Codex P1+P2 openclaw#29149)
…-open on delivery stats
…, and transactional abort
…turnId in ACP route
b6b842c to
811a135
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 811a135fce
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| payload, | ||
| channel: originatingChannel, | ||
| to: originatingTo, | ||
| turnId: ctx.MessageTurnId, |
There was a problem hiding this comment.
Stop linking routed tool/block sends as final turn evidence
sendPayloadAsync is used for routed tool/block callbacks, and this change now links those sends to the turn via turnId; however routeReply persists them through deliverOutboundPayloads without a dispatch_kind, while turn finalization treats dispatch_kind IS NULL rows as final (getOutboxStatusForTurn(..., { finalOnly: true })). In cross-provider turns that emit block/tool messages before the final reply, those intermediate rows can make a turn look delivered/failed and prevent replay of a missing final response after a crash. Fresh evidence here is the new turnId linkage on the tool/block routing path, which makes these NULL-kind rows participate in turn finalization.
Useful? React with 👍 / 👎.
| WHERE status IN ('queued','failed_retryable') | ||
| AND dispatch_kind IS NOT NULL | ||
| AND dispatch_kind != 'final' | ||
| AND queued_at < ?`, |
There was a problem hiding this comment.
Terminalize non-final retry rows even when queued post-startup
Non-final rows are explicitly excluded from recovery replay (loadPendingDeliveries filters to dispatch_kind IS NULL OR 'final'), but this cleanup only terminalizes non-final rows when queued_at < startupCutoff. If a tool/block send fails after startup, it remains failed_retryable forever in the current process (never retried, never terminalized, and therefore never pruned), which can cause unbounded message_outbox growth during outages until the gateway restarts.
Useful? React with 👍 / 👎.
|
This pull request has been automatically marked as stale due to inactivity. |
Summary
Change Type (select all)
Scope (select all touched areas)
Linked Issue/PR
User-visible / Behavior Changes
None visible — turn tracking is internal. Orphaned turns are logged and recovered transparently.
Security Impact (required)
Repro + Verification
Environment
Steps
Expected
Actual
Evidence
Human Verification (required)
Compatibility / Migration
Failure Recovery (if this breaks)
Risks and Mitigations