Skip to content

Commit 5ac997f

Browse files
Lobsterclaude
andcommitted
fix(bluebubbles): always run catchup on startup; advance cursor to page boundary on truncation
Addresses two more Codex P1 findings on PR #66760: - Run catchup even when cursor age is under MIN_INTERVAL Catchup is the *only* mechanism that recovers messages dropped during the gateway-down window, and it runs once per gateway startup. The 30s MIN_INTERVAL_MS gate was protecting against ~nothing (a few extra BB queries on rolling restarts) at the cost of permanent message loss when restarts happened within that window. A real failure: t0 startup, cursor=t0; t0+10s gateway down, webhook ECONNREFUSED at t0+15s; t0+20s restart skips catchup entirely. Gate removed; bounded by perRunLimit/maxAge + dedupe-protected so the cost of always running is capped. - Keep cursor behind unfetched pages when limit is hit Previously a long outage with >perRunLimit messages would fetch the oldest perRunLimit (sort:ASC), process them, and then advance the cursor to nowMs — permanently skipping the unfetched newer tail. Now we track the latest fetched timestamp regardless of fate, and on truncation (fetchedCount === perRunLimit) the cursor advances only to that page boundary so the next gateway startup picks up the rest. Updated the existing perRunLimit warn to reflect the new recoverable-on-next-startup semantics. Tests: replaced obsolete "skips rapid restart" with "runs catchup even on rapid restarts"; added "advances cursor only to last fetched ts when result is truncated". BB suite 410/410. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 34e0d52 commit 5ac997f

2 files changed

Lines changed: 101 additions & 31 deletions

File tree

extensions/bluebubbles/src/catchup.test.ts

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,15 +254,66 @@ describe("runBlueBubblesCatchup", () => {
254254
expect(called.proc).toBe(0);
255255
});
256256

257-
it("skips a rapid second run within MIN_INTERVAL_MS", async () => {
257+
it("runs catchup even on rapid restarts (no min-interval gate)", async () => {
258+
// Catchup runs once per gateway startup, so a quick restart MUST run
259+
// it again — otherwise messages dropped between the two startups
260+
// (gateway down → BB ECONNREFUSED → gateway up <30s later) are lost
261+
// permanently. Bounded by perRunLimit/maxAge + dedupe-protected.
258262
const now = 10_000;
259-
await saveBlueBubblesCatchupCursor("test-account", now - 5_000); // 5s ago
263+
await saveBlueBubblesCatchupCursor("test-account", now - 5_000);
264+
let fetched = false;
260265
const summary = await runBlueBubblesCatchup(makeTarget(), {
261266
now: () => now,
262-
fetchMessages: async () => ({ resolved: true, messages: [] }),
267+
fetchMessages: async () => {
268+
fetched = true;
269+
return { resolved: true, messages: [] };
270+
},
263271
processMessageFn: async () => {},
264272
});
265-
expect(summary).toBeNull();
273+
expect(fetched).toBe(true);
274+
expect(summary).not.toBeNull();
275+
});
276+
277+
it("advances cursor only to last fetched ts when result is truncated (perRunLimit hit)", async () => {
278+
// Long-outage scenario: 4 messages arrived during downtime but
279+
// perRunLimit=2. Sort:ASC means we get the 2 oldest. Cursor must
280+
// advance to the 2nd's timestamp (not nowMs) so the next startup
281+
// picks up the remaining 2.
282+
const now = 100 * 60 * 1000;
283+
await saveBlueBubblesCatchupCursor("test-account", 50 * 60 * 1000);
284+
const summary = await runBlueBubblesCatchup(
285+
makeTarget({
286+
account: {
287+
accountId: "test-account",
288+
enabled: true,
289+
configured: true,
290+
baseUrl: "http://127.0.0.1:1234",
291+
config: {
292+
serverUrl: "http://127.0.0.1:1234",
293+
password: "x",
294+
network: { dangerouslyAllowPrivateNetwork: true },
295+
catchup: { perRunLimit: 2 },
296+
} as unknown as WebhookTarget["account"]["config"],
297+
},
298+
}),
299+
{
300+
now: () => now,
301+
fetchMessages: async () => ({
302+
resolved: true,
303+
// Only the 2 the cap allows BB to return (oldest first via ASC).
304+
messages: [
305+
makeBbMessage({ guid: "p1", dateCreated: 60 * 60 * 1000 }),
306+
makeBbMessage({ guid: "p2", dateCreated: 70 * 60 * 1000 }),
307+
],
308+
}),
309+
processMessageFn: async () => {},
310+
},
311+
);
312+
expect(summary?.replayed).toBe(2);
313+
expect(summary?.fetchedCount).toBe(2);
314+
expect(summary?.cursorAfter).toBe(70 * 60 * 1000); // page boundary, not nowMs
315+
const cursor = await loadBlueBubblesCatchupCursor("test-account");
316+
expect(cursor?.lastSeenMs).toBe(70 * 60 * 1000);
266317
});
267318

268319
it("filters isFromMe before dispatch and still advances cursor", async () => {

extensions/bluebubbles/src/catchup.ts

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ const MAX_MAX_AGE_MINUTES = 12 * 60;
2121
const DEFAULT_PER_RUN_LIMIT = 50;
2222
const MAX_PER_RUN_LIMIT = 500;
2323
const DEFAULT_FIRST_RUN_LOOKBACK_MINUTES = 30;
24-
// Skip catchup on restarts <30s apart to avoid churn on healthy rolling
25-
// restarts (e.g. automated repair loops, deploy scripts).
26-
const MIN_INTERVAL_MS = 30_000;
2724
const FETCH_TIMEOUT_MS = 15_000;
2825

2926
export type BlueBubblesCatchupConfig = {
@@ -225,14 +222,15 @@ export async function runBlueBubblesCatchup(
225222
const existing = await loadBlueBubblesCatchupCursor(accountId).catch(() => null);
226223
const cursorBefore = existing?.lastSeenMs ?? null;
227224

228-
if (existing && nowMs >= existing.lastSeenMs && nowMs - existing.lastSeenMs < MIN_INTERVAL_MS) {
229-
// A recent run just committed; skip to avoid churn on rolling restarts.
230-
// The `nowMs >= existing.lastSeenMs` guard avoids a wall-clock-skew
231-
// hazard: if the host clock jumps backwards (NTP correction, manual
232-
// adjust), a future-dated cursor would otherwise satisfy this gate
233-
// forever and silently disable catchup until wall time caught up.
234-
return null;
235-
}
225+
// Catchup runs once per gateway startup (called from monitor.ts after
226+
// webhook target registration). We deliberately do NOT short-circuit on
227+
// a "ran recently" gate, because catchup is the only mechanism that
228+
// recovers messages dropped during the gateway-down window. A short
229+
// gap (e.g. <30s) between two startups can still have lost messages in
230+
// the middle, and skipping the second startup's catchup would lose
231+
// them permanently. The bounded query (perRunLimit, maxAge) and the
232+
// inbound-dedupe cache from #66230 cap the cost of running the query
233+
// every startup.
236234

237235
const earliestAllowed = nowMs - maxAgeMs;
238236
// A future-dated cursor (clock rollback via NTP correction or manual
@@ -299,12 +297,20 @@ export async function runBlueBubblesCatchup(
299297
// unlikely to ever normalize on retry, and blocking on them would wedge
300298
// catchup forever.
301299
let earliestProcessFailureTs: number | null = null;
300+
// Track the latest fetched message timestamp regardless of fate, so a
301+
// truncated query (fetchedCount === perRunLimit) can advance the cursor
302+
// exactly to the page boundary. Without this, the unfetched tail past
303+
// the cap is permanently unreachable.
304+
let latestFetchedTs = windowStartMs;
302305

303306
for (const rec of messages) {
304307
// Defense in depth: the server-side `after:` filter should already
305308
// exclude pre-cursor messages, but guard here against BB API variants
306309
// that return inclusive-of-boundary data.
307310
const ts = typeof rec.dateCreated === "number" ? rec.dateCreated : 0;
311+
if (ts > 0 && ts > latestFetchedTs) {
312+
latestFetchedTs = ts;
313+
}
308314
if (ts > 0 && ts <= windowStartMs) {
309315
summary.skippedPreCursor++;
310316
continue;
@@ -339,17 +345,29 @@ export async function runBlueBubblesCatchup(
339345
}
340346
}
341347

342-
// Compute the new cursor. Default is `nowMs` so subsequent runs start
343-
// from the moment this sweep finished (avoiding stuck rescans of a
344-
// message with `dateCreated > nowMs` from minor clock skew between the
345-
// BB Server host and the gateway host). If any `processMessage` call
346-
// threw, hold the cursor just before the earliest failure so the next
347-
// run retries from there. The inbound-dedupe cache from #66230 keeps
348-
// already-replayed messages from being re-processed on that retry.
348+
// Compute the new cursor.
349+
//
350+
// - Default: advance to `nowMs` so subsequent runs start from the moment
351+
// this sweep finished (avoiding stuck rescans of a message with
352+
// `dateCreated > nowMs` from minor clock skew between BB host and
353+
// gateway host).
354+
// - On retryable failure (any `processMessage` throw): hold the cursor
355+
// just before the earliest failed timestamp so the next run retries
356+
// from there. The inbound-dedupe cache from #66230 keeps successfully
357+
// replayed messages from being re-processed.
358+
// - On truncation (fetched === perRunLimit): advance only to the latest
359+
// fetched timestamp so the next run picks up from the page boundary.
360+
// Otherwise the unfetched tail past the cap (which can be substantial
361+
// during long outages) would be permanently unreachable.
362+
const isTruncated = summary.fetchedCount >= perRunLimit;
349363
let nextCursorMs = nowMs;
350364
if (earliestProcessFailureTs !== null) {
351365
const heldCursor = Math.max(earliestProcessFailureTs - 1, cursorBefore ?? windowStartMs);
352366
nextCursorMs = Math.min(heldCursor, nowMs);
367+
} else if (isTruncated) {
368+
// Use latestFetchedTs (clamped to >= prior cursor and <= nowMs) so the
369+
// next run starts where this page ended.
370+
nextCursorMs = Math.min(Math.max(latestFetchedTs, cursorBefore ?? windowStartMs), nowMs);
353371
}
354372
summary.cursorAfter = nextCursorMs;
355373
await saveBlueBubblesCatchupCursor(accountId, nextCursorMs).catch((err) => {
@@ -363,17 +381,18 @@ export async function runBlueBubblesCatchup(
363381
`window_ms=${nowMs - windowStartMs}`,
364382
);
365383

366-
// Emit a distinct warning when the BB result hits perRunLimit. The cursor
367-
// has already advanced to nowMs, so any older messages BB would have
368-
// returned past the cap are unreachable on the next sweep. Loud signal
369-
// for operators to raise perRunLimit before a long outage silently
370-
// truncates inbound history.
371-
if (summary.fetchedCount >= perRunLimit) {
384+
// Distinct WARNING when the BB result hits perRunLimit so operators
385+
// know a single startup didn't drain the full backlog. The cursor was
386+
// advanced only to the page boundary above, so the unfetched tail will
387+
// be picked up on the next gateway startup — but if startups are
388+
// infrequent, raising perRunLimit drains larger backlogs in one pass.
389+
if (isTruncated) {
372390
error?.(
373391
`[${accountId}] BlueBubbles catchup: WARNING fetched=${summary.fetchedCount} ` +
374-
`hit perRunLimit=${perRunLimit}; older messages in the window may have ` +
375-
`been truncated. Raise channels.bluebubbles...catchup.perRunLimit if ` +
376-
`outages can exceed this many inbound messages.`,
392+
`hit perRunLimit=${perRunLimit}; cursor advanced only to page boundary, ` +
393+
`remaining messages will be picked up on next startup. Raise ` +
394+
`channels.bluebubbles...catchup.perRunLimit to drain larger backlogs ` +
395+
`in a single pass.`,
377396
);
378397
}
379398

0 commit comments

Comments
 (0)