Skip to content

Commit 95d4673

Browse files
authored
fix(whatsapp): drain eligible pending deliveries on reconnect (#63916)
* fix(whatsapp): drain eligible pending deliveries on reconnect * docs(changelog): note whatsapp reconnect pending drain
1 parent ab9be8d commit 95d4673

7 files changed

Lines changed: 471 additions & 119 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai
3838
- Dreaming/gateway: require `operator.admin` for persistent `/dreaming on|off` changes and treat missing gateway client scopes as unprivileged instead of silently allowing config writes. (#63872) Thanks @mbelinky.
3939
- Matrix/multi-account: keep room-level `account` scoping, inherited room overrides, and implicit account selection consistent across top-level default auth, named accounts, and cached-credential env setups. (#58449) thanks @Daanvdplas and @gumadeiras.
4040
- Gateway/pairing: prefer explicit QR bootstrap auth over earlier Tailscale auth classification so iOS `/pair qr` silent bootstrap pairing does not fall through to `pairing required`. (#59232) Thanks @ngutman.
41+
- WhatsApp/outbound queue: drain same-account pending WhatsApp deliveries when the listener reconnects, including fresh queued sends that are already retry-eligible, so reconnects recover deliverable outbound messages without waiting for another gateway restart. (#63916) Thanks @mcaxtr.
4142
- Config/Discord: coerce safe integer numeric Discord IDs to strings during config validation, keep unsafe or precision-losing numeric snowflakes rejected, and align `openclaw doctor` repair guidance with the same fail-closed behavior. (#45125) Thanks @moliendocode.
4243
- Gateway/sessions: scope bare `sessions.create` aliases like `main` to the requested agent while preserving the canonical `global` and `unknown` sentinel keys. (#58207) thanks @jalehman.
4344
- `/context detail` now compares the tracked prompt estimate with cached context usage and surfaces untracked provider/runtime overhead when present. (#28391) thanks @ImLukeF.

extensions/whatsapp/src/auto-reply/monitor.ts

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { resolveInboundDebounceMs } from "openclaw/plugin-sdk/channel-inbound";
33
import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime";
44
import { waitForever } from "openclaw/plugin-sdk/cli-runtime";
55
import { hasControlCommand } from "openclaw/plugin-sdk/command-detection";
6-
import { drainReconnectQueue } from "openclaw/plugin-sdk/infra-runtime";
6+
import { drainPendingDeliveries } from "openclaw/plugin-sdk/infra-runtime";
77
import { enqueueSystemEvent } from "openclaw/plugin-sdk/infra-runtime";
88
import { DEFAULT_GROUP_HISTORY_LIMIT } from "openclaw/plugin-sdk/reply-history";
99
import { resolveAgentRoute } from "openclaw/plugin-sdk/routing";
@@ -75,6 +75,14 @@ function loadReplyResolverRuntime() {
7575
return replyResolverRuntimePromise;
7676
}
7777

78+
function normalizeReconnectAccountId(accountId?: string | null): string {
79+
return (accountId ?? "").trim() || "default";
80+
}
81+
82+
function isNoListenerReconnectError(lastError?: string): boolean {
83+
return typeof lastError === "string" && /No active WhatsApp Web listener/i.test(lastError);
84+
}
85+
7886
export async function monitorWebChannel(
7987
verbose: boolean,
8088
listenerFactory: typeof monitorWebInbox | undefined = monitorWebInbox,
@@ -261,11 +269,24 @@ export async function monitorWebChannel(
261269

262270
setActiveWebListener(account.accountId, listener);
263271

264-
// Drain any messages that failed with "no listener" during the disconnect window.
265-
void drainReconnectQueue({
266-
accountId: account.accountId,
272+
const normalizedAccountId = normalizeReconnectAccountId(account.accountId);
273+
274+
// Reconnect is the transport-ready signal for WhatsApp, so drain eligible
275+
// pending deliveries for this account here instead of hardcoding that
276+
// policy inside the generic queue engine.
277+
void drainPendingDeliveries({
278+
drainKey: `whatsapp:${normalizedAccountId}`,
279+
logLabel: "WhatsApp reconnect drain",
267280
cfg,
268281
log: reconnectLogger,
282+
selectEntry: (entry) => ({
283+
match:
284+
entry.channel === "whatsapp" &&
285+
normalizeReconnectAccountId(entry.accountId) === normalizedAccountId,
286+
// Reconnect changed listener readiness, so these should not sit behind
287+
// the normal backoff window.
288+
bypassBackoff: isNoListenerReconnectError(entry.lastError),
289+
}),
269290
}).catch((err) => {
270291
reconnectLogger.warn(
271292
{ connectionId: active.connectionId, error: String(err) },

src/infra/outbound/delivery-queue-recovery.ts

Lines changed: 161 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { formatErrorMessage } from "../errors.js";
33
import {
44
ackDelivery,
55
failDelivery,
6+
loadPendingDelivery,
67
loadPendingDeliveries,
78
moveToFailed,
89
type QueuedDelivery,
@@ -30,6 +31,11 @@ export interface RecoveryLogger {
3031
error(msg: string): void;
3132
}
3233

34+
export interface PendingDeliveryDrainDecision {
35+
match: boolean;
36+
bypassBackoff?: boolean;
37+
}
38+
3339
const MAX_RETRIES = 5;
3440

3541
/** Backoff delays in milliseconds indexed by retry count (1-based). */
@@ -54,8 +60,6 @@ const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [
5460
/User .* not in room/i,
5561
];
5662

57-
const NO_LISTENER_ERROR_RE = /No active WhatsApp Web listener/i;
58-
5963
const drainInProgress = new Map<string, boolean>();
6064
const entriesInProgress = new Set<string>();
6165

@@ -68,10 +72,6 @@ function loadDeliverRuntime() {
6872
return deliverRuntimePromise;
6973
}
7074

71-
function normalizeQueueAccountId(accountId?: string): string {
72-
return (accountId ?? "").trim() || "default";
73-
}
74-
7575
function getErrnoCode(err: unknown): string | null {
7676
return err && typeof err === "object" && "code" in err
7777
? String((err as { code?: unknown }).code)
@@ -179,82 +179,151 @@ export function isPermanentDeliveryError(error: string): boolean {
179179
return PERMANENT_ERROR_PATTERNS.some((re) => re.test(error));
180180
}
181181

182-
export async function drainReconnectQueue(opts: {
183-
accountId: string;
182+
async function drainQueuedEntry(opts: {
183+
entry: QueuedDelivery;
184+
cfg: OpenClawConfig;
185+
deliver: DeliverFn;
186+
stateDir?: string;
187+
onRecovered?: (entry: QueuedDelivery) => void;
188+
onFailed?: (entry: QueuedDelivery, errMsg: string) => void;
189+
}): Promise<"recovered" | "failed" | "moved-to-failed" | "already-gone"> {
190+
const { entry } = opts;
191+
try {
192+
await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg));
193+
await ackDelivery(entry.id, opts.stateDir);
194+
opts.onRecovered?.(entry);
195+
return "recovered";
196+
} catch (err) {
197+
const errMsg = formatErrorMessage(err);
198+
opts.onFailed?.(entry, errMsg);
199+
if (isPermanentDeliveryError(errMsg)) {
200+
try {
201+
await moveToFailed(entry.id, opts.stateDir);
202+
return "moved-to-failed";
203+
} catch (moveErr) {
204+
if (getErrnoCode(moveErr) === "ENOENT") {
205+
return "already-gone";
206+
}
207+
}
208+
} else {
209+
try {
210+
await failDelivery(entry.id, errMsg, opts.stateDir);
211+
return "failed";
212+
} catch (failErr) {
213+
if (getErrnoCode(failErr) === "ENOENT") {
214+
return "already-gone";
215+
}
216+
}
217+
}
218+
return "failed";
219+
}
220+
}
221+
222+
export async function drainPendingDeliveries(opts: {
223+
drainKey: string;
224+
logLabel: string;
184225
cfg: OpenClawConfig;
185226
log: RecoveryLogger;
186227
stateDir?: string;
187228
deliver?: DeliverFn;
229+
selectEntry: (entry: QueuedDelivery, now: number) => PendingDeliveryDrainDecision;
188230
}): Promise<void> {
189-
if (drainInProgress.get(opts.accountId)) {
190-
opts.log.info(
191-
`WhatsApp reconnect drain: already in progress for account ${opts.accountId}, skipping`,
192-
);
231+
if (drainInProgress.get(opts.drainKey)) {
232+
opts.log.info(`${opts.logLabel}: already in progress for ${opts.drainKey}, skipping`);
193233
return;
194234
}
195235

196-
drainInProgress.set(opts.accountId, true);
236+
drainInProgress.set(opts.drainKey, true);
197237
try {
238+
const now = Date.now();
239+
const deliver = opts.deliver ?? (await loadDeliverRuntime()).deliverOutboundPayloads;
198240
const matchingEntries = (await loadPendingDeliveries(opts.stateDir))
241+
.map((entry) => ({
242+
entry,
243+
decision: opts.selectEntry(entry, now),
244+
}))
199245
.filter(
200-
(entry) =>
201-
entry.channel === "whatsapp" &&
202-
normalizeQueueAccountId(entry.accountId) === opts.accountId &&
203-
typeof entry.lastError === "string" &&
204-
NO_LISTENER_ERROR_RE.test(entry.lastError),
246+
(item): item is { entry: QueuedDelivery; decision: PendingDeliveryDrainDecision } =>
247+
item.decision.match,
205248
)
206-
.toSorted((a, b) => a.enqueuedAt - b.enqueuedAt);
249+
.toSorted((a, b) => a.entry.enqueuedAt - b.entry.enqueuedAt);
207250

208251
if (matchingEntries.length === 0) {
209252
return;
210253
}
211254

212255
opts.log.info(
213-
`WhatsApp reconnect drain: ${matchingEntries.length} pending message(s) for account ${opts.accountId}`,
256+
`${opts.logLabel}: ${matchingEntries.length} pending message(s) matched ${opts.drainKey}`,
214257
);
215258

216-
const deliver = opts.deliver ?? (await loadDeliverRuntime()).deliverOutboundPayloads;
217-
218-
for (const entry of matchingEntries) {
259+
for (const { entry, decision } of matchingEntries) {
219260
if (!claimRecoveryEntry(entry.id)) {
220-
opts.log.info(`WhatsApp reconnect drain: entry ${entry.id} is already being recovered`);
261+
opts.log.info(`${opts.logLabel}: entry ${entry.id} is already being recovered`);
221262
continue;
222263
}
223264

224-
if (entry.retryCount >= MAX_RETRIES) {
225-
try {
226-
await moveToFailed(entry.id, opts.stateDir);
227-
} catch (err) {
228-
if (getErrnoCode(err) === "ENOENT") {
229-
opts.log.info(`reconnect drain: entry ${entry.id} already gone, skipping`);
265+
try {
266+
// Re-read after claim so the queue file remains the source of truth.
267+
// This prevents stale startup/reconnect snapshots from re-sending an
268+
// entry that another recovery path already acked.
269+
const currentEntry = await loadPendingDelivery(entry.id, opts.stateDir);
270+
if (!currentEntry) {
271+
opts.log.info(`${opts.logLabel}: entry ${entry.id} already gone, skipping`);
272+
continue;
273+
}
274+
275+
if (currentEntry.retryCount >= MAX_RETRIES) {
276+
try {
277+
await moveToFailed(currentEntry.id, opts.stateDir);
278+
} catch (err) {
279+
if (getErrnoCode(err) === "ENOENT") {
280+
opts.log.info(`${opts.logLabel}: entry ${currentEntry.id} already gone, skipping`);
281+
continue;
282+
}
283+
throw err;
284+
}
285+
opts.log.warn(
286+
`${opts.logLabel}: entry ${currentEntry.id} exceeded max retries and was moved to failed/`,
287+
);
288+
continue;
289+
}
290+
291+
if (!decision.bypassBackoff) {
292+
const retryEligibility = isEntryEligibleForRecoveryRetry(currentEntry, Date.now());
293+
if (!retryEligibility.eligible) {
294+
opts.log.info(
295+
`${opts.logLabel}: entry ${currentEntry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`,
296+
);
230297
continue;
231298
}
232-
throw err;
233-
} finally {
234-
releaseRecoveryEntry(entry.id);
235299
}
236-
opts.log.warn(
237-
`WhatsApp reconnect drain: entry ${entry.id} exceeded max retries and was moved to failed/`,
238-
);
239-
continue;
240-
}
241300

242-
try {
243-
await deliver(buildRecoveryDeliverParams(entry, opts.cfg));
244-
await ackDelivery(entry.id, opts.stateDir);
245-
} catch (err) {
246-
const errMsg = formatErrorMessage(err);
247-
if (isPermanentDeliveryError(errMsg)) {
248-
await moveToFailed(entry.id, opts.stateDir).catch(() => {});
249-
} else {
250-
await failDelivery(entry.id, errMsg, opts.stateDir).catch(() => {});
301+
const result = await drainQueuedEntry({
302+
entry: currentEntry,
303+
cfg: opts.cfg,
304+
deliver,
305+
stateDir: opts.stateDir,
306+
onFailed: (failedEntry, errMsg) => {
307+
if (isPermanentDeliveryError(errMsg)) {
308+
opts.log.warn(
309+
`${opts.logLabel}: entry ${failedEntry.id} hit permanent error — moving to failed/: ${errMsg}`,
310+
);
311+
return;
312+
}
313+
opts.log.warn(`${opts.logLabel}: retry failed for entry ${failedEntry.id}: ${errMsg}`);
314+
},
315+
});
316+
if (result === "recovered") {
317+
opts.log.info(
318+
`${opts.logLabel}: drained delivery ${currentEntry.id} on ${currentEntry.channel}`,
319+
);
251320
}
252321
} finally {
253322
releaseRecoveryEntry(entry.id);
254323
}
255324
}
256325
} finally {
257-
drainInProgress.delete(opts.accountId);
326+
drainInProgress.delete(opts.drainKey);
258327
}
259328
}
260329

@@ -289,57 +358,60 @@ export async function recoverPendingDeliveries(opts: {
289358
await deferRemainingEntriesForBudget(pending.slice(i), opts.stateDir);
290359
break;
291360
}
292-
if (entry.retryCount >= MAX_RETRIES) {
293-
if (!claimRecoveryEntry(entry.id)) {
294-
opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`);
295-
continue;
296-
}
297-
try {
298-
opts.log.warn(
299-
`Delivery ${entry.id} exceeded max retries (${entry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
300-
);
301-
await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir);
302-
summary.skippedMaxRetries += 1;
303-
} finally {
304-
releaseRecoveryEntry(entry.id);
305-
}
306-
continue;
307-
}
308-
309-
const retryEligibility = isEntryEligibleForRecoveryRetry(entry, now);
310-
if (!retryEligibility.eligible) {
311-
summary.deferredBackoff += 1;
312-
opts.log.info(
313-
`Delivery ${entry.id} not ready for retry yet — backoff ${retryEligibility.remainingBackoffMs}ms remaining`,
314-
);
315-
continue;
316-
}
317361

318362
if (!claimRecoveryEntry(entry.id)) {
319363
opts.log.info(`Recovery skipped for delivery ${entry.id}: already being processed`);
320364
continue;
321365
}
322366

323367
try {
324-
await opts.deliver(buildRecoveryDeliverParams(entry, opts.cfg));
325-
await ackDelivery(entry.id, opts.stateDir);
326-
summary.recovered += 1;
327-
opts.log.info(`Recovered delivery ${entry.id} to ${entry.channel}:${entry.to}`);
328-
} catch (err) {
329-
const errMsg = formatErrorMessage(err);
330-
if (isPermanentDeliveryError(errMsg)) {
331-
opts.log.warn(`Delivery ${entry.id} hit permanent error — moving to failed/: ${errMsg}`);
332-
await moveEntryToFailedWithLogging(entry.id, opts.log, opts.stateDir);
333-
summary.failed += 1;
368+
const currentEntry = await loadPendingDelivery(entry.id, opts.stateDir);
369+
if (!currentEntry) {
370+
opts.log.info(`Recovery skipped for delivery ${entry.id}: already gone`);
334371
continue;
335372
}
336-
try {
337-
await failDelivery(entry.id, errMsg, opts.stateDir);
338-
} catch {
339-
// Best-effort update.
373+
374+
if (currentEntry.retryCount >= MAX_RETRIES) {
375+
opts.log.warn(
376+
`Delivery ${currentEntry.id} exceeded max retries (${currentEntry.retryCount}/${MAX_RETRIES}) — moving to failed/`,
377+
);
378+
await moveEntryToFailedWithLogging(currentEntry.id, opts.log, opts.stateDir);
379+
summary.skippedMaxRetries += 1;
380+
continue;
381+
}
382+
383+
const currentRetryEligibility = isEntryEligibleForRecoveryRetry(currentEntry, Date.now());
384+
if (!currentRetryEligibility.eligible) {
385+
summary.deferredBackoff += 1;
386+
opts.log.info(
387+
`Delivery ${currentEntry.id} not ready for retry yet — backoff ${currentRetryEligibility.remainingBackoffMs}ms remaining`,
388+
);
389+
continue;
390+
}
391+
392+
const result = await drainQueuedEntry({
393+
entry: currentEntry,
394+
cfg: opts.cfg,
395+
deliver: opts.deliver,
396+
stateDir: opts.stateDir,
397+
onRecovered: (recoveredEntry) => {
398+
summary.recovered += 1;
399+
opts.log.info(`Recovered delivery ${recoveredEntry.id} on ${recoveredEntry.channel}`);
400+
},
401+
onFailed: (failedEntry, errMsg) => {
402+
summary.failed += 1;
403+
if (isPermanentDeliveryError(errMsg)) {
404+
opts.log.warn(
405+
`Delivery ${failedEntry.id} hit permanent error — moving to failed/: ${errMsg}`,
406+
);
407+
return;
408+
}
409+
opts.log.warn(`Retry failed for delivery ${failedEntry.id}: ${errMsg}`);
410+
},
411+
});
412+
if (result === "moved-to-failed") {
413+
continue;
340414
}
341-
summary.failed += 1;
342-
opts.log.warn(`Retry failed for delivery ${entry.id}: ${errMsg}`);
343415
} finally {
344416
releaseRecoveryEntry(entry.id);
345417
}

0 commit comments

Comments
 (0)