feat(bluebubbles): replay missed webhook messages after gateway restart (#66721)#66853
feat(bluebubbles): replay missed webhook messages after gateway restart (#66721)#66853omarshahine wants to merge 6 commits intomainfrom
Conversation
…rt (#66721) Adds a per-account startup catchup pass that queries BB Server for messages delivered since a persisted cursor and re-feeds each through the existing processMessage pipeline. Fixes the missed-message hole documented in #66721: BB's WebhookService is fire-and-forget on POST failure and MessagePoller does not replay webhook-receiver recovery, so inbound messages delivered while the gateway was down/wedged/restarting were permanently lost. Design - New `extensions/bluebubbles/src/catchup.ts`: - `fetchBlueBubblesMessagesSince(sinceMs, limit, opts)` calls `/api/v1/message/query` with `{after, sort:"ASC", with:[chat, chat.participants, attachment]}` so replays carry the same shape `normalizeWebhookMessage` already handles on live dispatch. - `loadBlueBubblesCatchupCursor` / `saveBlueBubblesCatchupCursor` persist a single `{lastSeenMs, updatedAt}` per account at `<stateDir>/bluebubbles/catchup/<accountId>__<hash>.json`, using the plugin-sdk's atomic JSON helpers. File layout mirrors the inbound-dedupe store from #66230. - `runBlueBubblesCatchup(target)` orchestrates: clamp config, skip recent runs (<30s), clamp window to `maxAgeMinutes`, fetch, filter `isFromMe` and pre-cursor records, dispatch to `processMessage`, advance cursor to `nowMs` on success. - `monitor.ts`: after the webhook target registers, fire catchup as a background task; errors are logged but never block readiness. - `config-schema.ts`: new optional `catchup` block (`enabled`, `maxAgeMinutes`, `perRunLimit`, `firstRunLookbackMinutes`), defaults on with 2h lookback / 50 msg cap / 30-min first-run lookback. Safety - Goes through the same `processMessage` path webhooks use, so auth, allowlist, pairing, and downstream agent dispatch all apply unchanged. - Dedupes against #66230's persistent inbound GUID cache, so a webhook delivery that already succeeded cannot be reprocessed by catchup. - Never dispatches `isFromMe` records (double-checked before and after normalization) so the agent's own sends cannot enter the inbound path. - Cursor is only advanced on a successful query; a failed fetch leaves the prior cursor in place so the next run retries the same window. - 30s minimum interval between runs prevents churn on rolling restarts. - Hard ceilings: 12h max lookback, 500 messages per run. Validation - New scoped tests in `extensions/bluebubbles/src/catchup.test.ts` (14 cases covering cursor round-trip, account scoping, FS-unsafe account IDs, firstRunLookback, maxAge clamp, enabled:false, rapid restart skip, isFromMe filter, query failure preserving cursor, per-message failure isolation, and pre-cursor defense-in-depth). - Full BlueBubbles suite (403/403) and `pnpm check` green. - End-to-end verified on macOS with a live BB Server 1.9.x: stop gateway, send N messages to monitored threads (verified 3/3 ECONNREFUSED in BB's `main.log`), restart gateway; catchup queried and re-POSTed all N missed messages through `processMessage`, and subsequent boots were no-ops (empty delta). Retires the reference implementation at `openclaw-agents/lobster/scripts/bb-catchup.sh` in the Lobster workspace, which has been running this exact shape for ~4 weeks. Closes #66721. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…rn on perRunLimit truncation Addresses review feedback on PR #66760: - Greptile P2: catchup.ts and inbound-dedupe.ts each had their own `resolveStateDirFromEnv`. Catchup's was a half-baked local copy that missed `~` expansion and the legacy/new state-dir fallback that the canonical `openclaw/plugin-sdk/state-paths.resolveStateDir` provides. Switch catchup to the canonical resolver so the catchup cursor and the inbound-dedupe state are guaranteed to land under the same root, preserving the dedupe-after-replay invariant the catchup design depends on. Test isolation preserved by honoring an explicit `OPENCLAW_STATE_DIR` override before the VITEST/test-mode default. - Greptile P2 + Aisle CWE-532-adjacent: when the BB result hits `perRunLimit`, emit a distinct WARNING through the runtime `error` channel pointing at the config knob to raise. Previously the run summary just showed `fetched=N` with no signal that older messages in the window may have been silently truncated (the cursor still advances to nowMs, so they're unreachable on the next sweep). - Tests: 2 new cases covering "warn on cap-hit" and "no warn below cap"; full BB suite remains 405/405 (was 403/403; +2 new). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…kew gate Addresses Codex review findings on PR #66760: - P1: Avoid committing catchup cursor after replay failures Previously the cursor advanced to nowMs unconditionally, so a transient processMessage failure during startup catchup permanently dropped the failing record — the next sweep queried only `after nowMs` and never retried it. Now we track the earliest timestamp where processMessage threw and hold the cursor just before it (still clamped to >= the prior cursor and <= nowMs), so retries pick up exactly the failed records on the next run. The inbound-dedupe cache from #66230 keeps successfully-replayed messages from being re-processed on retry. Normalize failures (record didn't yield a usable NormalizedWebhookMessage) are still treated as permanent skips, so a malformed payload doesn't wedge catchup forever. - P2: Skip min-interval gate when stored cursor is in the future If the host clock jumped backwards (NTP correction, manual adjust), `nowMs - existing.lastSeenMs` was negative, which still satisfies `< MIN_INTERVAL_MS`, so catchup returned null repeatedly until wall time caught up. Added `nowMs >= existing.lastSeenMs` precondition so future-dated cursors stop disabling catchup. Tests: 3 new cases (cursor-held-on-failure, clamp-to-prior-cursor, clock-skew-bypass-gate); BB suite remains 408/408. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e catchup overrides Addresses two more Codex P2 findings on PR #66760: - Enforce maxAgeMinutes on first catchup run Previously the no-cursor branch used `nowMs - firstRunLookbackMs` directly without applying `earliestAllowed`, so a config like `maxAgeMinutes: 5, firstRunLookbackMinutes: 30` silently scanned 30 minutes on first startup. Now both branches clamp against the earliestAllowed ceiling. - Preserve channel-level `catchup.enabled` in account overrides BlueBubbles account merging only deep-merges `network`. With catchup outside that list, an override like `accounts.work.catchup: { perRunLimit: 20 }` would replace the whole channel-level catchup block and silently drop a global `catchup.enabled: false`, re-enabling replay for that account. Adding `catchup` to `nestedObjectKeys` deep-merges the override on top of inherited defaults, matching the existing `network` precedent. Tests: 1 new case for the first-run maxAge clamp; BB suite 409/409. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…covers via firstRunLookback Addresses Codex P2 finding on PR #66760 (cursor in future causes silent skip). Previously, after a host clock rollback (NTP correction or manual adjust), `existing.lastSeenMs` could be ahead of `nowMs`. The earlier `Math.max(existing.lastSeenMs, earliestAllowed)` then computed `windowStartMs` in the future, so the BB query ran with `after=future`, returned zero records, and we still saved cursor = nowMs at the end of the run — permanently skipping every real message delivered in the [earliestAllowed, nowMs] window. Now: a cursor strictly in the future is treated as if no cursor exists at all. The window falls through to the firstRunLookback path (clamped to maxAge), so catchup looks back a sensible distance and replays through the existing dedupe-protected pipeline. Cursor save at the end of the run repairs the future timestamp. Test: extends the previous "skips rate-limit gate" case to assert that the recovery window is firstRunLookback-clamped (not nowMs) and that the cursor is repaired to nowMs after the run. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ge boundary on truncation Addresses two more Codex P1 findings on PR #66760: - Run catchup even when cursor age is under MIN_INTERVAL Catchup is the *only* mechanism that recovers messages dropped during the gateway-down window, and it runs once per gateway startup. The 30s MIN_INTERVAL_MS gate was protecting against ~nothing (a few extra BB queries on rolling restarts) at the cost of permanent message loss when restarts happened within that window. A real failure: t0 startup, cursor=t0; t0+10s gateway down, webhook ECONNREFUSED at t0+15s; t0+20s restart skips catchup entirely. Gate removed; bounded by perRunLimit/maxAge + dedupe-protected so the cost of always running is capped. - Keep cursor behind unfetched pages when limit is hit Previously a long outage with >perRunLimit messages would fetch the oldest perRunLimit (sort:ASC), process them, and then advance the cursor to nowMs — permanently skipping the unfetched newer tail. Now we track the latest fetched timestamp regardless of fate, and on truncation (fetchedCount === perRunLimit) the cursor advances only to that page boundary so the next gateway startup picks up the rest. Updated the existing perRunLimit warn to reflect the new recoverable-on-next-startup semantics. Tests: replaced obsolete "skips rapid restart" with "runs catchup even on rapid restarts"; added "advances cursor only to last fetched ts when result is truncated". BB suite 410/410. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
🔒 Aisle Security AnalysisWe found 4 potential security issue(s) in this PR:
1. 🟠 BlueBubbles API password placed in URL query string (leaks via logs/proxies/errors)
DescriptionThe catchup implementation constructs BlueBubbles API URLs using
Vulnerable code path:
RecommendationAvoid sending credentials in URLs. Preferred fix (if BlueBubbles server supports it): send the password in a header (or another non-URL channel) and keep URLs free of secrets. const url = buildBlueBubblesApiUrl({ baseUrl: opts.baseUrl, path: "/api/v1/message/query" });
const res = await blueBubblesFetchWithTimeout(
url,
{
method: "POST",
headers: {
"Content-Type": "application/json",
// Example: use Authorization header instead of query param
"Authorization": `Bearer ${opts.password}`,
},
body,
},
opts.timeoutMs ?? FETCH_TIMEOUT_MS,
ssrfPolicy,
);If the API requires
Example redaction: function redactBlueBubblesUrl(raw: string): string {
const u = new URL(raw);
if (u.searchParams.has("password")) u.searchParams.set("password", "<redacted>");
return u.toString();
}And ensure error logs do not include the unredacted URL. 2. 🟡 BlueBubbles catchup runs after monitor stop/unregister (no cancellation), allowing message processing when account is disabled/stopped
Description
As a result:
Vulnerable code: // monitor.ts
runBlueBubblesCatchup(target).catch((err) => {
runtime.error?.(
`[${account.accountId}] BlueBubbles catchup: unexpected failure: ${String(err)}`,
);
});RecommendationTie catchup execution to the monitor lifecycle so it cannot process messages once the provider is stopped/unregistered. Options:
// monitor.ts
const catchupAbort = new AbortController();
const stop = () => {
catchupAbort.abort();
unregister();
resolve();
};
void runBlueBubblesCatchup(target, { abortSignal: catchupAbort.signal })
.catch(err => runtime.error?.(`[...] ${String(err)}`));And in if (deps.abortSignal?.aborted) return null;
for (const rec of messages) {
if (deps.abortSignal?.aborted) break;
await procFn(normalized, target);
}
Also consider logging when catchup is aborted to avoid silent partial replays. 3. 🟡 Unbounded concurrent BlueBubbles catchup tasks on monitor startup (no cancellation/locking)
DescriptionThe BlueBubbles monitor starts a catchup replay task in a fire-and-forget manner on every provider startup:
While inbound dedupe may prevent double-delivery, it does not prevent double-work (extra queries, normalization, per-message processing attempts), making startup a potential amplification point and a denial-of-service vector in operational scenarios (e.g., repeated restarts or multiple concurrent monitors). RecommendationAdd concurrency control and shutdown cancellation for catchup runs. Options:
// catchup.ts
const inFlight = new Map<string, Promise<BlueBubblesCatchupSummary | null>>();
export function runBlueBubblesCatchupSingleflight(target: WebhookTarget) {
const key = target.account.accountId;
const existing = inFlight.get(key);
if (existing) return existing;
const p = runBlueBubblesCatchup(target).finally(() => inFlight.delete(key));
inFlight.set(key, p);
return p;
}
export async function runBlueBubblesCatchup(target: WebhookTarget, deps: RunBlueBubblesCatchupDeps = {}, signal?: AbortSignal) {
if (signal?.aborted) return null;
// before/after fetch, and inside the message loop:
for (const rec of messages) {
if (signal?.aborted) break;
await procFn(normalized, target);
}
}
These changes prevent runaway parallel replays, reduce startup amplification, and ensure catchup work does not continue after a monitor is stopped. 4. 🟡 Unbounded JSON response buffering in BlueBubbles catchup fetch can cause memory/CPU DoS
DescriptionThe BlueBubbles catchup recovery path fetches messages from the configured BlueBubbles server and parses the entire HTTP response as JSON without any client-side response-size limit or hard cap on the number of returned records. This is problematic because:
If the BlueBubbles server is compromised/misconfigured, or if an attacker can influence Vulnerable code: const json = (await res.json().catch(() => null)) as { data?: unknown } | null;
...
const messages: Array<Record<string, unknown>> = [];
for (const entry of json.data) {
const rec = asRecord(entry);
if (rec) {
messages.push(rec);
}
}RecommendationAdd explicit client-side caps for both response size and record count.
const messages: Array<Record<string, unknown>> = [];
for (const entry of json.data) {
if (messages.length >= limit) break; // hard cap
const rec = asRecord(entry);
if (rec) messages.push(rec);
}
const MAX_BYTES = 1_000_000; // choose an appropriate upper bound
const len = Number(res.headers.get("content-length") ?? "0");
if (len && len > MAX_BYTES) throw new Error("Response too large");For robust protection, implement streaming / incremental parsing or a guarded fetch helper that aborts once Analyzed PR: #66853 at commit Last updated on: 2026-04-14T22:52:22Z |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5ac997f733
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| const isTruncated = summary.fetchedCount >= perRunLimit; | ||
| let nextCursorMs = nowMs; | ||
| if (earliestProcessFailureTs !== null) { | ||
| const heldCursor = Math.max(earliestProcessFailureTs - 1, cursorBefore ?? windowStartMs); |
There was a problem hiding this comment.
Avoid clamping retry cursor against future saved cursor
When the persisted cursor is ahead of nowMs (the rollback case you already detect earlier), this retry branch still uses cursorBefore in Math.max(...). That makes heldCursor stay in the future and Math.min(heldCursor, nowMs) collapses cursorAfter to nowMs, so a transient processMessage failure is not retried and those messages can be skipped permanently. Clamp against windowStartMs (or only a cursor proven usable) in this branch.
Useful? React with 👍 / 👎.
| } else if (isTruncated) { | ||
| // Use latestFetchedTs (clamped to >= prior cursor and <= nowMs) so the | ||
| // next run starts where this page ended. | ||
| nextCursorMs = Math.min(Math.max(latestFetchedTs, cursorBefore ?? windowStartMs), nowMs); |
There was a problem hiding this comment.
Preserve page-boundary cursor when recovering from future cursor
The truncation path also reuses cursorBefore even when it was marked unusable (existing.lastSeenMs > nowMs). In that case Math.max(latestFetchedTs, cursorBefore ...) picks the future value, then Math.min(..., nowMs) advances the cursor to nowMs instead of the last fetched timestamp, which can permanently drop unfetched backlog items when fetchedCount === perRunLimit. Use only a usable prior cursor (or windowStartMs) for this clamp.
Useful? React with 👍 / 👎.
Greptile SummaryThis PR adds a startup catchup pass to the BlueBubbles channel that queries Confidence Score: 5/5Safe to merge — no P0/P1 issues found; only a minor CHANGELOG capitalization nit remains. All findings are P2 (style). The implementation is thoroughly tested (21 unit cases, 410/410 suite-wide), includes a live e2e validation, and all CI gates are green. The cursor-management logic, isFromMe guards, truncation boundary, and failure-isolation behaviors are each covered by dedicated test cases. No files require special attention. Prompt To Fix All With AIThis is a comment left during a code review.
Path: CHANGELOG.md
Line: 15
Comment:
**Lowercase `thanks` attribution**
All other changelog entries in this file use `Thanks @author` (capital T). This one uses `thanks @omarshahine` — a minor inconsistency with the project convention.
```suggestion
- fix(bluebubbles): replay missed webhook messages after gateway restart via a persistent per-account cursor and `/api/v1/message/query?after=<ts>` pass, so messages delivered while the gateway was down no longer disappear. Uses the existing `processMessage` path and is deduped by #66816's inbound GUID cache. (#66721) Thanks @omarshahine
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (1): Last reviewed commit: "fix(bluebubbles): always run catchup on ..." | Re-trigger Greptile |
|
|
||
| ### Fixes | ||
|
|
||
| - fix(bluebubbles): replay missed webhook messages after gateway restart via a persistent per-account cursor and `/api/v1/message/query?after=<ts>` pass, so messages delivered while the gateway was down no longer disappear. Uses the existing `processMessage` path and is deduped by #66816's inbound GUID cache. (#66721) thanks @omarshahine |
There was a problem hiding this comment.
All other changelog entries in this file use Thanks @author (capital T). This one uses thanks @omarshahine — a minor inconsistency with the project convention.
| - fix(bluebubbles): replay missed webhook messages after gateway restart via a persistent per-account cursor and `/api/v1/message/query?after=<ts>` pass, so messages delivered while the gateway was down no longer disappear. Uses the existing `processMessage` path and is deduped by #66816's inbound GUID cache. (#66721) thanks @omarshahine | |
| - fix(bluebubbles): replay missed webhook messages after gateway restart via a persistent per-account cursor and `/api/v1/message/query?after=<ts>` pass, so messages delivered while the gateway was down no longer disappear. Uses the existing `processMessage` path and is deduped by #66816's inbound GUID cache. (#66721) Thanks @omarshahine |
Prompt To Fix With AI
This is a comment left during a code review.
Path: CHANGELOG.md
Line: 15
Comment:
**Lowercase `thanks` attribution**
All other changelog entries in this file use `Thanks @author` (capital T). This one uses `thanks @omarshahine` — a minor inconsistency with the project convention.
```suggestion
- fix(bluebubbles): replay missed webhook messages after gateway restart via a persistent per-account cursor and `/api/v1/message/query?after=<ts>` pass, so messages delivered while the gateway was down no longer disappear. Uses the existing `processMessage` path and is deduped by #66816's inbound GUID cache. (#66721) Thanks @omarshahine
```
How can I resolve this? If you propose a fix, please make it concise.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
|
Superseded by a squashed-history PR on branch |
Summary
Fixes #66721. Adds an in-process startup catchup pass to the BlueBubbles channel that queries BB Server for messages delivered while the gateway was unreachable and replays them through the existing
processMessagepipeline.The hole this closes: BB Server's
WebhookServiceis fire-and-forget on POST failure (no retries) and BB'sMessagePolleronly re-fires webhooks on BB-side reconnection events (Messages.app / APNs), not on webhook-receiver recovery. So inbound messages delivered while the OpenClaw gateway was down, restarting, or wedged were permanently lost — verified with a controlled experiment on macOS.This PR was previously #66760, stacked on the now-merged dedupe work (#66816). After that landed, this was rebased onto main and reopened as a standalone PR against main.
Design
extensions/bluebubbles/src/catchup.ts:fetchBlueBubblesMessagesSince(sinceMs, limit, opts)calls/api/v1/message/querywith{after, sort:"ASC", with:["chat","chat.participants","attachment"]}so replays carry the same shapenormalizeWebhookMessagealready handles on live dispatch.loadBlueBubblesCatchupCursor/saveBlueBubblesCatchupCursorpersist a single{lastSeenMs, updatedAt}per account under<stateDir>/bluebubbles/catchup/<accountId>__<hash>.json, using the plugin-sdk's atomic JSON helpers. File layout mirrors the inbound-dedupe store from fix(bluebubbles): dedupe inbound webhooks across restarts (#19176, #12053) #66816.runBlueBubblesCatchup(target)orchestrates: clamp config, fetch, filterisFromMeand pre-cursor records, dispatch toprocessMessage, advance cursor.monitor.ts: after the webhook target registers, fire catchup as a background task; errors are logged but never block the channel-ready signal.config-schema.ts: new optionalcatchupblock (enabled,maxAgeMinutes,perRunLimit,firstRunLookbackMinutes); defaults are on with 2h lookback / 50 msg cap / 30-min first-run lookback.accounts.ts: addscatchupto the account-mergenestedObjectKeyslist so per-account overrides deep-merge on top of channel-level defaults (mirroring the existingnetworkprecedent).Why this approach
The fix mirrors a workspace-level shell script that's been running on a real OpenClaw install for ~4 weeks (~100 LoC of bash + python doing the same query/filter/POST flow). Porting it into the BB channel itself means every install gets recovery for free, calls
processMessagedirectly (no re-POST hop), and benefits from #66816's persistent dedupe automatically.Safety
processMessagepath webhooks use, so auth, allowlist, pairing, and downstream agent dispatch all apply unchanged.isFromMerecords (double-checked before and after normalization) so the agent's own sends cannot enter the inbound path.nowMson fully-successful runs. OnprocessMessagefailure, the cursor is held just before the earliest failure so the next run retries from there. On truncation (fetchedCount === perRunLimit), the cursor advances only to the last-fetched timestamp so the next gateway startup picks up the unfetched tail.nowMs(clamped to the latest observed timestamp on truncation) rather than the latest observed message timestamp unconditionally, to avoid stuck rescans from clock skew between BB-host and gateway-host.Validation
Automated
extensions/bluebubbles/src/catchup.test.ts(21 cases): cursor round-trip, per-account scoping, filesystem-unsafe account IDs, first-run lookback + maxAge clamp, enabled: false, rapid-restart-still-runs, isFromMe filter (pre- and post-normalization), query-failure-preserves-cursor, per-message failure isolation, held-cursor-on-retryable-failure, clamp-to-prior-cursor, future-cursor recovery, pre-cursor defense-in-depth, perRunLimit warn / no-warn, truncation-cursor advances only to page boundary, and first-run maxAge clamp.pnpm checkgreen (madge, tsgo, oxlint, webhook-auth-body-order, no-pairing-store-group, pairing-account-scope).Live end-to-end (macOS, BB Server 1.9.x, 2026-04-14)
Repeating the original repro from #66721's issue body against the new in-process catchup with the workspace shim disabled:
connect ECONNREFUSED 127.0.0.1:18789and never retried.~/.openclaw/bluebubbles/catchup/<accountId>__<hash>.json. Subsequent gateway restart with no new inbound activity loggedreplayed=0 fetched=0(no-op, as expected).Test plan
pnpm test extensions/bluebubbles/src/catchup.test.ts— 21/21pnpm test extensions/bluebubbles/— 410/410pnpm check— greenReview-feedback history
This branch carries review feedback that was gathered on the prior PR #66760 (closed automatically when its base
lobster/bb-inbound-dedupewas deleted after #66816 merged). All findings were addressed in-branch; the 5 follow-up commits on this branch are:1b4402219cGreptile P2: align catchup state-dir with canonical SDK resolver; warn on perRunLimit truncation.8d5129ff79Codex P1: hold catchup cursor on retryable failures; Codex P2: clock-skew gate precondition.d154af9518Codex P2: clamp first-run catchup window to maxAge; deep-merge catchup overrides.34e0d52f6aCodex P2: treat future-dated cursor as unusable; recover via firstRunLookback.5ac997f733Codex P1 ×2: always run catchup on startup (remove min-interval gate); advance cursor to page boundary on truncation.