feat(lifecycle): persistent inbound dedup across gateway restarts#30012
feat(lifecycle): persistent inbound dedup across gateway restarts#30012nohat wants to merge 17 commits intoopenclaw:mainfrom
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d72b631741
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| `SELECT id, payload, queued_at, attempt_count, last_attempt_at, last_error, turn_id | ||
| FROM message_outbox | ||
| WHERE status IN ('queued', 'failed_retryable') | ||
| AND (dispatch_kind IS NULL OR dispatch_kind = 'final') |
There was a problem hiding this comment.
Recover non-final outbox rows to avoid stuck turns
The recovery query now skips rows where dispatch_kind is tool/block, but turn state still treats those rows as active/failing (getOutboxStatusForTurn and runTurnPass count all queued/failed rows). If a tool/block send is left queued or failed_retryable (for example after a transient send error or crash), that row is never retried, so the turn stays delivery_pending and can later be marked failed on expiry even when the final reply was otherwise successful. This creates persistent stuck/failed turns after restart/error scenarios.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Non-final rows are excluded from turn health checks by the dispatch_kind SQL filter (added in the write-ahead-recovery PR). The fix in #30009 now terminates orphaned non-final rows during recovery, ensuring they're cleaned up and don't cause stuck turns.
Greptile SummaryThis PR enables persistent inbound message deduplication across gateway restarts by migrating from in-memory dedup to SQLite-backed turn tracking. Previously, dedup state was lost on restart, potentially causing duplicate message processing. The implementation adds comprehensive lifecycle management for both turns and outbound deliveries. Key changes:
Architecture:
Confidence Score: 4/5
Last reviewed commit: d72b631 |
d72b631 to
a458274
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: a4582741f3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
src/auto-reply/dispatch.ts
Outdated
| const actualSends = dispatcher.getDeliveryStats?.()?.successfulSends ?? 0; | ||
| if (actualSends > 0) { | ||
| finalizeTurn(turnId, "delivered"); | ||
| } else { | ||
| recordTurnRecoveryFailure(turnId, "final send queued but no successful sends"); |
There was a problem hiding this comment.
Don't fail routed ACP turns via dispatcher send count
This success check assumes final replies are sent through the local dispatcher, but ACP cross-surface routing sends final payloads via routeReply (see tryDispatchAcpReply in src/auto-reply/reply/dispatch-acp.ts) and never increments successfulSends. In that case attemptedFinal/queuedFinal can both be true while successfulSends remains 0, so this branch records a recovery failure for turns that were actually delivered, causing unnecessary retries and eventual terminal turn failures.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Same concern as #30011 comment — fixed there with the outbox-based fail-open change. This PR inherits the fix from the turn-tracking base branch.
Replace unbounded file-based delivery queue with queryable SQLite message_outbox table. Adds TTL/expiry for stale entries, delivery outcome retention, and one-time legacy file queue import on startup. Closes openclaw#23777, openclaw#16555, openclaw#29128
a458274 to
7e40bd0
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7e40bd0125
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| } else if (status.delivered > 0 && status.failed === 0) { | ||
| finalizeTurn(turnId, "delivered"); |
There was a problem hiding this comment.
Base turn success on final-dispatch rows only
This branch marks a turn as delivered whenever any outbox row is delivered, but getOutboxStatusForTurn aggregates tool/block/final rows together. If tool or block messages succeed and the final reply fails to queue/send (attemptedFinal > 0, queuedFinal = false), the turn is still finalized as delivered here, so recovery never retries the missing final response. That causes user-visible dropped final replies after partial-success turns.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed — same fix as #30011. getOutboxStatusForTurn now accepts { finalOnly: true } which filters to final-dispatch rows only. Both outbox status checks in dispatch.ts and the turn pass now use this option, so tool/block sends no longer cause premature turn finalization.
7e40bd0 to
8b9258f
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8b9258fda8
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if (kind === "final") { | ||
| attemptedFinal += 1; | ||
| } |
There was a problem hiding this comment.
Link ACP final attempts to turn outbox records
Fresh evidence in this commit: tryDispatchAcpReply now increments attemptedFinal, but ACP-routed sends still call routeReply without a turnId, so the final outbox rows are not associated with the turn. In dispatchInboundMessageInternal, the new finalization logic relies on turn-scoped outbox status (and otherwise falls back to dispatcher send counters), which means cross-surface ACP replies can be marked as recovery failures and retried/failed even when delivery succeeded.
Useful? React with 👍 / 👎.
8b9258f to
765cdd7
Compare
There was a problem hiding this comment.
Pull request overview
Adds a durable “message lifecycle” layer to make inbound deduplication survive gateway restarts by persisting turn state (and related outbound delivery rows) in a SQLite-backed lifecycle DB, with background workers to recover/prune state.
Changes:
- Introduces SQLite lifecycle DB schema +
message_turns-based accept/dedup and turn recovery utilities. - Migrates outbound delivery queue from filesystem JSON to SQLite
message_outbox, adding startup-cutoff logic to avoid double-delivery after restart. - Integrates turn/outbox linkage throughout dispatch/routing/abort flows and adds tests covering restart persistence.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/telegram/bot.create-telegram-bot.test-harness.ts | Mocks lifecycle turn tracking/outbox to avoid cross-test persistent dedupe interference. |
| src/infra/outbound/outbound.test.ts | Updates outbound queue tests for SQLite-backed outbox + startup-cutoff behaviors. |
| src/infra/outbound/delivery-queue.ts | Replaces file queue with SQLite outbox, adds recovery/expiry/pruning + turn finalization hooks. |
| src/infra/outbound/deliver.ts | Links outbound queue rows to turns via turnId; improves best-effort partial failure detection. |
| src/infra/message-lifecycle/turns.ts | Adds persistent turn acceptance/dedup, recovery helpers, abort coordination, and pruning. |
| src/infra/message-lifecycle/turns.test.ts | Adds tests for persistent inbound dedup across simulated restarts and pruning. |
| src/infra/message-lifecycle/index.ts | Re-exports lifecycle DB + turns APIs. |
| src/infra/message-lifecycle/db.ts | Adds lifecycle SQLite DB open/cache logic and schema creation (turns + outbox). |
| src/gateway/server.impl.ts | Starts/stops lifecycle workers instead of one-shot outbound recovery on startup. |
| src/gateway/server-message-lifecycle.ts | Adds background workers: outbox recovery/prune + turn recovery/finalization. |
| src/config/zod-schema.session.ts | Adds messages.delivery config schema for outbox expiry behavior. |
| src/config/types.messages.ts | Adds typed config for messages.delivery (maxAgeMs / expireAction). |
| src/config/schema.labels.ts | Adds UI labels for new delivery lifecycle config fields. |
| src/config/schema.help.ts | Adds help text for new delivery lifecycle config fields. |
| src/channels/plugins/types.adapters.ts | Extends outbound context with delivery metadata + optional idempotency support flag. |
| src/auto-reply/templating.ts | Adds MessageTurnId to message context for lifecycle linking. |
| src/auto-reply/reply/route-reply.ts | Threads optional turnId through routing to outbound delivery pipeline. |
| src/auto-reply/reply/reply-dispatcher.ts | Enqueues/acks/fails per-reply outbox rows when turn delivery context is set. |
| src/auto-reply/reply/dispatch-from-config.ts | Adds skipInboundDedupe for resumed turns + tracks attemptedFinal and passes turnId to routeReply. |
| src/auto-reply/reply/dispatch-acp.ts | Tracks attemptedFinal and passes turnId into ACP routing deliveries. |
| src/auto-reply/reply/abort.ts | Aborts turns + cancels queued outbox rows when an abort is requested. |
| src/auto-reply/dispatch.ts | Integrates turn acceptance/dedup, active-turn tracking, and turn finalization based on outbox state; adds resumed-turn dispatch entrypoint. |
| } | ||
| // Already removed — no-op. | ||
| } catch (err) { | ||
| logVerbose(`delivery-queue: ackDelivery failed: ${String(err)}`); |
There was a problem hiding this comment.
ackDelivery() swallows all DB errors and leaves the row in a retryable state; the recovery worker will then re-send the message. That creates guaranteed duplicate deliveries if the UPDATE fails (and the comment assumes provider idempotency that isn't enforced here). Consider making ack/fail writes more reliable (e.g. retry/propagate, or at least elevate to warn/error and prevent resend on bookkeeping-only failures).
| logVerbose(`delivery-queue: ackDelivery failed: ${String(err)}`); | |
| logVerbose(`delivery-queue: ackDelivery failed: ${String(err)}`); | |
| throw err; |
| entry.enqueuedAt ?? Date.now(), | ||
| "queued", | ||
| entry.retryCount ?? 0, | ||
| entry.lastAttemptAt ?? entry.enqueuedAt ?? Date.now(), | ||
| entry.lastError ?? null, | ||
| entry.lastAttemptAt ?? null, | ||
| ); |
There was a problem hiding this comment.
importLegacyFileQueue() sets next_attempt_at to lastAttemptAt/enqueuedAt without applying the backoff schedule. Because loadPendingDeliveries filters on next_attempt_at, this can cause high-backoff legacy retries to be scanned every tick and then deferred in-memory. Consider computing next_attempt_at as (baseAttemptAt + computeBackoffMs(retryCount)) during import so SQLite can filter deferred rows efficiently.
| now, | ||
| ); | ||
| const changes = db.prepare("SELECT changes() AS c").get() as { c: number }; | ||
| return { accepted: changes.c > 0, id }; |
There was a problem hiding this comment.
When the INSERT OR IGNORE is ignored due to an existing dedupe_key, acceptTurn() returns accepted=false but still returns a newly-generated id that does not correspond to any stored turn row. That makes logs/metrics misleading and prevents callers from referencing the existing turn. Consider fetching and returning the existing turn id (e.g. SELECT id WHERE dedupe_key=?) when changes() is 0.
| return { accepted: changes.c > 0, id }; | |
| if (changes.c > 0) { | |
| return { accepted: true, id }; | |
| } | |
| const existing = db | |
| .prepare( | |
| `SELECT id | |
| FROM message_turns | |
| WHERE dedupe_key = ? AND external_id = ? | |
| ORDER BY accepted_at DESC | |
| LIMIT 1`, | |
| ) | |
| .get(dedupeKey, externalId) as { id: string } | undefined; | |
| return { accepted: false, id: existing?.id ?? id }; |
| // Without aging, entries are not eligible because the outbox worker defers fresh | ||
| // entries for 5 s to avoid racing with the direct delivery path. |
There was a problem hiding this comment.
This comment says the outbox worker defers fresh entries for 5s, but the current recovery eligibility is driven by next_attempt_at/startupCutoff (and enqueueDelivery writes next_attempt_at=now). Please update the comment to reflect the actual deferral mechanism so the test remains self-explanatory.
| // Without aging, entries are not eligible because the outbox worker defers fresh | |
| // entries for 5 s to avoid racing with the direct delivery path. | |
| // Without aging, entries are not eligible for crash recovery because the worker | |
| // only picks up rows with next_attempt_at (and queued_at) earlier than the startup | |
| // cutoff. enqueueDelivery initializes next_attempt_at=now, so we move timestamps | |
| // into the past to make these entries unambiguously look like crash leftovers. |
| db.exec("DROP INDEX IF EXISTS idx_message_turns_dedup"); | ||
| db.exec(` | ||
| CREATE UNIQUE INDEX IF NOT EXISTS idx_message_turns_dedup | ||
| ON message_turns(dedupe_key) | ||
| WHERE dedupe_key IS NOT NULL; | ||
| `); |
There was a problem hiding this comment.
ensureLifecycleSchema() drops and recreates indexes unconditionally. On a non-trivial DB this can be expensive and can briefly lock the database on every gateway start. Consider using a schema version/migration (or only dropping when the existing index definition differs) instead of always running DROP INDEX.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 765cdd75b2
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| AND next_attempt_at <= ? | ||
| AND (queued_at < ? OR last_attempt_at IS NOT NULL OR attempt_count > 0) |
There was a problem hiding this comment.
Recover post-start rows missing attempt metadata
This startup-cutoff predicate permanently excludes rows created after boot unless attempt_count > 0 or last_attempt_at is set. In the current flow, ackDelivery/failDelivery intentionally swallow DB write errors, so a send can succeed or fail while the outbox row remains queued with attempt_count=0 and no last_attempt_at; with startMessageLifecycleWorkers always passing startupCutoff, that row is never retried until a full restart. For turn-linked deliveries this can leave turns stuck in delivery_pending and later marked stale/failed even though delivery already happened or should have been retried.
Useful? React with 👍 / 👎.
Every inbound message creates a durable turn record in message_turns. Turn worker detects orphaned turns (accepted but never completed after crash) and recovers them. Abort commands mark turns as aborted, preventing re-delivery. Outbox entries are linked to turns for coordinated finalization. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…d premature finalization (Codex P1+P2 openclaw#29149)
…-open on delivery stats
…, and transactional abort
…turnId in ACP route
765cdd7 to
831fb67
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 831fb67dd8
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| channel: channelId, | ||
| to, | ||
| accountId: accountId ?? undefined, | ||
| turnId: params.turnId, |
There was a problem hiding this comment.
Propagate dispatch kind when routing turn-linked payloads
routeReply now attaches turnId for all routed sends, including tool/block payloads emitted via dispatch-from-config and ACP routing, but these rows are still enqueued without a dispatch_kind and therefore look like final sends to lifecycle queries that treat dispatch_kind IS NULL as final-only. In cross-surface streaming/tool scenarios, those non-final rows can be replayed by recovery and can also make turn finalization read as delivered/failed based on tool/block outcomes instead of the actual final reply. Please pass the reply kind through the routing path so outbox rows are tagged and excluded from final-only turn health checks.
Useful? React with 👍 / 👎.
|
This pull request has been automatically marked as stale due to inactivity. |
Summary
Change Type (select all)
Scope (select all touched areas)
Linked Issue/PR
User-visible / Behavior Changes
None visible — dedup is more reliable across restarts. No config changes needed.
Security Impact (required)
Repro + Verification
Environment
Steps
Expected
Actual
Evidence
Human Verification (required)
Compatibility / Migration
Failure Recovery (if this breaks)
Risks and Mitigations