Skip to content

Commit da1da61

Browse files
authored
fix(whatsapp): preserve replies across reconnects (#62892)
1 parent d838fb5 commit da1da61

5 files changed

Lines changed: 343 additions & 10 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ Docs: https://docs.openclaw.ai
88

99
### Fixes
1010

11+
- WhatsApp/auto-reply: keep inbound reply, media, and composing sends on the current socket across reconnects, wait through reconnect gaps, and retry timeout-only send failures without dropping the active socket ref. (#62892) Thanks @mcaxtr.
12+
1113
## 2026.4.9
1214

1315
### Changes

extensions/whatsapp/src/auto-reply/monitor.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { WASocket } from "@whiskeysockets/baileys";
12
import { resolveInboundDebounceMs } from "openclaw/plugin-sdk/channel-inbound";
23
import { formatCliCommand } from "openclaw/plugin-sdk/cli-runtime";
34
import { waitForever } from "openclaw/plugin-sdk/cli-runtime";
@@ -165,6 +166,20 @@ export async function monitorWebChannel(
165166
process.once("SIGINT", handleSigint);
166167

167168
let reconnectAttempts = 0;
169+
const socketRef: { current: WASocket | null } = { current: null };
170+
const disconnectRetryController = new AbortController();
171+
const stopDisconnectRetries = () => {
172+
if (!disconnectRetryController.signal.aborted) {
173+
disconnectRetryController.abort();
174+
}
175+
};
176+
if (abortSignal) {
177+
if (abortSignal.aborted) {
178+
stopDisconnectRetries();
179+
} else {
180+
abortSignal.addEventListener("abort", stopDisconnectRetries, { once: true });
181+
}
182+
}
168183

169184
while (true) {
170185
if (stopRequested()) {
@@ -217,6 +232,11 @@ export async function monitorWebChannel(
217232
sendReadReceipts: account.sendReadReceipts,
218233
debounceMs: inboundDebounceMs,
219234
shouldDebounce,
235+
socketRef,
236+
shouldRetryDisconnect: () =>
237+
keepAlive && !sigintStop && !stopRequested() && !disconnectRetryController.signal.aborted,
238+
disconnectRetryPolicy: reconnectPolicy,
239+
disconnectRetryAbortSignal: disconnectRetryController.signal,
220240
onMessage: async (msg: WebInboundMsg) => {
221241
active.handledMessages += 1;
222242
active.lastInboundAt = Date.now();
@@ -257,6 +277,7 @@ export async function monitorWebChannel(
257277
});
258278

259279
const closeListener = async () => {
280+
socketRef.current = null;
260281
setActiveWebListener(account.accountId, null);
261282
if (active.unregisterUnhandled) {
262283
active.unregisterUnhandled();
@@ -343,6 +364,7 @@ export async function monitorWebChannel(
343364
}
344365

345366
if (!keepAlive) {
367+
stopDisconnectRetries();
346368
await closeListener();
347369
process.removeListener("SIGINT", handleSigint);
348370
return;
@@ -363,6 +385,7 @@ export async function monitorWebChannel(
363385
statusController.noteReconnectAttempts(reconnectAttempts);
364386

365387
if (stopRequested() || sigintStop || reason === "aborted") {
388+
stopDisconnectRetries();
366389
await closeListener();
367390
break;
368391
}
@@ -396,6 +419,7 @@ export async function monitorWebChannel(
396419
});
397420

398421
if (loggedOut) {
422+
stopDisconnectRetries();
399423
statusController.noteClose({
400424
statusCode: numericStatusCode,
401425
loggedOut: true,
@@ -411,6 +435,7 @@ export async function monitorWebChannel(
411435
}
412436

413437
if (isNonRetryableWebCloseStatus(statusCode)) {
438+
stopDisconnectRetries();
414439
statusController.noteClose({
415440
statusCode: numericStatusCode,
416441
error: errorStr,
@@ -434,6 +459,7 @@ export async function monitorWebChannel(
434459

435460
reconnectAttempts += 1;
436461
if (reconnectPolicy.maxAttempts > 0 && reconnectAttempts >= reconnectPolicy.maxAttempts) {
462+
stopDisconnectRetries();
437463
statusController.noteClose({
438464
statusCode: numericStatusCode,
439465
error: errorStr,

extensions/whatsapp/src/inbound/monitor.ts

Lines changed: 88 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
import type { AnyMessageContent, proto, WAMessage } from "@whiskeysockets/baileys";
1+
import type { AnyMessageContent, proto, WAMessage, WASocket } from "@whiskeysockets/baileys";
22
import { createInboundDebouncer, formatLocationText } from "openclaw/plugin-sdk/channel-inbound";
33
import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime";
44
import { logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env";
55
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
66
import { getChildLogger } from "openclaw/plugin-sdk/text-runtime";
77
import { readWebSelfIdentity } from "../auth-store.js";
88
import { getPrimaryIdentityId, resolveComparableIdentity } from "../identity.js";
9-
import { createWaSocket, getStatusCode, waitForWaConnection } from "../session.js";
9+
import { DEFAULT_RECONNECT_POLICY, computeBackoff, sleepWithAbort } from "../reconnect.js";
10+
import { createWaSocket, formatError, getStatusCode, waitForWaConnection } from "../session.js";
1011
import { resolveJidToE164 } from "../text-runtime.js";
1112
import { checkInboundAccessControl } from "./access-control.js";
1213
import {
@@ -28,11 +29,20 @@ import { createWebSendApi } from "./send-api.js";
2829
import type { WebInboundMessage, WebListenerCloseReason } from "./types.js";
2930

3031
const LOGGED_OUT_STATUS = DisconnectReason?.loggedOut ?? 401;
32+
const RECONNECT_IN_PROGRESS_ERROR = "no active socket - reconnection in progress";
3133

3234
function isGroupJid(jid: string): boolean {
3335
return (typeof isJidGroup === "function" ? isJidGroup(jid) : jid.endsWith("@g.us")) === true;
3436
}
3537

38+
function isRetryableSendDisconnectError(err: unknown): boolean {
39+
return /closed|reset|timed\s*out|disconnect|no active socket/i.test(formatError(err));
40+
}
41+
42+
function shouldClearSocketRefAfterSendFailure(err: unknown): boolean {
43+
return /closed|reset|disconnect|no active socket/i.test(formatError(err));
44+
}
45+
3646
export async function monitorWebInbox(options: {
3747
verbose: boolean;
3848
accountId: string;
@@ -47,6 +57,20 @@ export async function monitorWebInbox(options: {
4757
debounceMs?: number;
4858
/** Optional debounce gating predicate. */
4959
shouldDebounce?: (msg: WebInboundMessage) => boolean;
60+
/** Optional shared socket reference so reply closures can follow reconnects. */
61+
socketRef?: { current: WASocket | null };
62+
/** Whether send retries should wait for a reconnect. */
63+
shouldRetryDisconnect?: () => boolean;
64+
/** Reconnect timing for waiting through transient socket replacement gaps. */
65+
disconnectRetryPolicy?: {
66+
initialMs: number;
67+
maxMs: number;
68+
factor: number;
69+
jitter: number;
70+
maxAttempts: number;
71+
};
72+
/** Abort in-flight reconnect waits when shutdown becomes terminal. */
73+
disconnectRetryAbortSignal?: AbortSignal;
5074
}) {
5175
const inboundLogger = getChildLogger({ module: "web-inbound" });
5276
const inboundConsoleLog = createSubsystemLogger("gateway/channels/whatsapp").child("inbound");
@@ -55,6 +79,16 @@ export async function monitorWebInbox(options: {
5579
});
5680
await waitForWaConnection(sock);
5781
const connectedAtMs = Date.now();
82+
if (options.socketRef) {
83+
options.socketRef.current = sock;
84+
}
85+
const getCurrentSock = () => (options.socketRef ? options.socketRef.current : sock);
86+
const shouldRetryDisconnect = () => options.shouldRetryDisconnect?.() === true;
87+
const disconnectRetryPolicy = options.disconnectRetryPolicy ?? DEFAULT_RECONNECT_POLICY;
88+
const sendRetryMaxAttempts =
89+
disconnectRetryPolicy.maxAttempts > 0
90+
? disconnectRetryPolicy.maxAttempts
91+
: DEFAULT_RECONNECT_POLICY.maxAttempts;
5892

5993
let onCloseResolve: ((reason: WebListenerCloseReason) => void) | null = null;
6094
const onClose = new Promise<WebListenerCloseReason>((resolve) => {
@@ -160,9 +194,43 @@ export async function monitorWebInbox(options: {
160194
};
161195

162196
const sendTrackedMessage = async (jid: string, content: AnyMessageContent) => {
163-
const result = await sock.sendMessage(jid, content);
164-
rememberOutboundMessage(jid, result);
165-
return result;
197+
let lastErr: unknown = new Error(RECONNECT_IN_PROGRESS_ERROR);
198+
for (let attempt = 1; ; attempt++) {
199+
const currentSock = getCurrentSock();
200+
if (currentSock) {
201+
try {
202+
const result = await currentSock.sendMessage(jid, content);
203+
rememberOutboundMessage(jid, result);
204+
return result;
205+
} catch (err) {
206+
if (!shouldRetryDisconnect() || !isRetryableSendDisconnectError(err)) {
207+
throw err;
208+
}
209+
lastErr = err;
210+
if (
211+
shouldClearSocketRefAfterSendFailure(err) &&
212+
options.socketRef?.current === currentSock
213+
) {
214+
options.socketRef.current = null;
215+
}
216+
}
217+
} else if (!shouldRetryDisconnect()) {
218+
throw lastErr;
219+
}
220+
221+
if (attempt >= sendRetryMaxAttempts) {
222+
throw lastErr;
223+
}
224+
const delayMs = computeBackoff(disconnectRetryPolicy, attempt);
225+
logVerbose(
226+
`Waiting ${delayMs}ms for WhatsApp reconnect before retrying send to ${jid}: ${formatError(lastErr)}`,
227+
);
228+
try {
229+
await sleepWithAbort(delayMs, options.disconnectRetryAbortSignal);
230+
} catch {
231+
throw lastErr;
232+
}
233+
}
166234
};
167235

168236
const getGroupMeta = async (jid: string) => {
@@ -379,8 +447,12 @@ export async function monitorWebInbox(options: {
379447
) => {
380448
const chatJid = inbound.remoteJid;
381449
const sendComposing = async () => {
450+
const currentSock = getCurrentSock();
451+
if (!currentSock) {
452+
return;
453+
}
382454
try {
383-
await sock.sendPresenceUpdate("composing", chatJid);
455+
await currentSock.sendPresenceUpdate("composing", chatJid);
384456
} catch (err) {
385457
logVerbose(`Presence update failed: ${String(err)}`);
386458
}
@@ -502,6 +574,9 @@ export async function monitorWebInbox(options: {
502574
) => {
503575
try {
504576
if (update.connection === "close") {
577+
if (options.socketRef?.current === sock) {
578+
options.socketRef.current = null;
579+
}
505580
const status = getStatusCode(update.lastDisconnect?.error);
506581
resolveClose({
507582
status,
@@ -550,7 +625,13 @@ export async function monitorWebInbox(options: {
550625
const sendApi = createWebSendApi({
551626
sock: {
552627
sendMessage: (jid: string, content: AnyMessageContent) => sendTrackedMessage(jid, content),
553-
sendPresenceUpdate: (presence, jid?: string) => sock.sendPresenceUpdate(presence, jid),
628+
sendPresenceUpdate: async (presence, jid?: string) => {
629+
const currentSock = getCurrentSock();
630+
if (!currentSock) {
631+
throw new Error(RECONNECT_IN_PROGRESS_ERROR);
632+
}
633+
return currentSock.sendPresenceUpdate(presence, jid);
634+
},
554635
},
555636
defaultAccountId: options.accountId,
556637
});

0 commit comments

Comments
 (0)