Skip to content

Commit e9989f3

Browse files
Oviemudiagamcaxtr
andauthored
fix(whatsapp): periodic delivery-queue drain so enqueued items don't wait for next reconnect (#79083)
Merged via squash. Prepared head SHA: 9a619bb Co-authored-by: Oviemudiaga <49584793+Oviemudiaga@users.noreply.github.com> Co-authored-by: mcaxtr <7562095+mcaxtr@users.noreply.github.com> Reviewed-by: @mcaxtr
1 parent ff4bf0c commit e9989f3

3 files changed

Lines changed: 121 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
Docs: https://docs.openclaw.ai
44

5+
## Unreleased
6+
7+
### Fixes
8+
9+
- WhatsApp: drain pending outbound deliveries on a 30s periodic timer in addition to the reconnect handler, so messages enqueued while the provider is already connected no longer wait for the next reconnect to send. (#79083) Thanks @Oviemudiaga.
10+
511
## 2026.5.19
612

713
### Changes

extensions/whatsapp/src/auto-reply.web-auto-reply.connection-and-logging.e2e.test.ts

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,25 @@ import {
2727
startWebAutoReplyMonitor,
2828
} from "./auto-reply.test-harness.js";
2929

30+
type DrainSelectionEntry = {
31+
channel: string;
32+
accountId?: string | null;
33+
lastError?: string;
34+
};
35+
type DrainPendingDeliveriesCall = {
36+
drainKey: string;
37+
logLabel: string;
38+
selectEntry: (entry: DrainSelectionEntry) => { match: boolean; bypassBackoff: boolean };
39+
};
40+
41+
const deliveryQueueMocks = vi.hoisted(() => ({
42+
drainPendingDeliveries: vi.fn(async (_opts: unknown) => undefined),
43+
}));
44+
45+
vi.mock("openclaw/plugin-sdk/delivery-queue-runtime", () => ({
46+
drainPendingDeliveries: deliveryQueueMocks.drainPendingDeliveries,
47+
}));
48+
3049
installWebAutoReplyTestHomeHooks();
3150

3251
function requireOnMessage(
@@ -247,6 +266,78 @@ describe("web auto-reply connection", () => {
247266
expect(sleep).toHaveBeenCalled();
248267
});
249268

269+
it("drains pending deliveries while connected and stops after close", async () => {
270+
vi.useFakeTimers();
271+
try {
272+
const sleep = vi.fn(async () => {});
273+
const scripted = createScriptedWebListenerFactory();
274+
const { controller, run } = startWebAutoReplyMonitor({
275+
monitorWebChannelFn: monitorWebChannel as never,
276+
listenerFactory: scripted.listenerFactory,
277+
sleep,
278+
accountId: "work",
279+
});
280+
281+
await vi.waitFor(
282+
() => {
283+
expect(scripted.getListenerCount()).toBe(1);
284+
},
285+
{ timeout: 250, interval: 2 },
286+
);
287+
expect(deliveryQueueMocks.drainPendingDeliveries).toHaveBeenCalledWith(
288+
expect.objectContaining({
289+
drainKey: "whatsapp:work",
290+
logLabel: "WhatsApp reconnect drain",
291+
}),
292+
);
293+
294+
deliveryQueueMocks.drainPendingDeliveries.mockClear();
295+
await vi.advanceTimersByTimeAsync(30_000);
296+
await vi.waitFor(() => {
297+
expect(deliveryQueueMocks.drainPendingDeliveries).toHaveBeenCalledTimes(1);
298+
});
299+
300+
const periodicCall = deliveryQueueMocks.drainPendingDeliveries.mock.calls.at(-1)?.[0] as
301+
| DrainPendingDeliveriesCall
302+
| undefined;
303+
expect(periodicCall).toBeDefined();
304+
if (!periodicCall) {
305+
throw new Error("Expected WhatsApp periodic drain call");
306+
}
307+
expect(periodicCall.drainKey).toBe("whatsapp:work");
308+
expect(periodicCall.logLabel).toBe("WhatsApp periodic drain");
309+
expect(
310+
periodicCall.selectEntry({
311+
channel: "whatsapp",
312+
accountId: "work",
313+
}),
314+
).toEqual({ match: true, bypassBackoff: false });
315+
expect(
316+
periodicCall.selectEntry({
317+
channel: "whatsapp",
318+
accountId: "default",
319+
}),
320+
).toEqual({ match: false, bypassBackoff: false });
321+
expect(
322+
periodicCall.selectEntry({
323+
channel: "telegram",
324+
accountId: "work",
325+
}),
326+
).toEqual({ match: false, bypassBackoff: false });
327+
328+
controller.abort();
329+
scripted.resolveClose(0, { status: 499, isLoggedOut: false, error: "aborted" });
330+
await Promise.resolve();
331+
await run;
332+
333+
deliveryQueueMocks.drainPendingDeliveries.mockClear();
334+
await vi.advanceTimersByTimeAsync(30_000);
335+
expect(deliveryQueueMocks.drainPendingDeliveries).not.toHaveBeenCalled();
336+
} finally {
337+
vi.useRealTimers();
338+
}
339+
});
340+
250341
it("treats status 440 as non-retryable and stops without retrying", async () => {
251342
const sleep = vi.fn(async () => {});
252343
const scripted = createScriptedWebListenerFactory();

extensions/whatsapp/src/auto-reply/monitor.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,17 +541,40 @@ export async function monitorWebChannel(
541541
);
542542
});
543543

544+
const periodicDrainInterval = setInterval(() => {
545+
void drainPendingDeliveries({
546+
drainKey: `whatsapp:${normalizedAccountId}`,
547+
logLabel: "WhatsApp periodic drain",
548+
cfg,
549+
log: reconnectLogger,
550+
selectEntry: (entry) => ({
551+
match:
552+
entry.channel === "whatsapp" &&
553+
normalizeReconnectAccountId(entry.accountId) === normalizedAccountId,
554+
bypassBackoff: false,
555+
}),
556+
}).catch((err) => {
557+
reconnectLogger.warn(
558+
{ connectionId: connection.connectionId, error: String(err) },
559+
"periodic drain failed",
560+
);
561+
});
562+
}, 30_000);
563+
544564
whatsappLog.info("Listening for personal WhatsApp inbound messages.");
545565
if (process.stdout.isTTY || process.stderr.isTTY) {
546566
whatsappLog.raw("Ctrl+C to stop.");
547567
}
548568

549569
if (!keepAlive) {
570+
clearInterval(periodicDrainInterval);
550571
await controller.shutdown();
551572
return;
552573
}
553574

554-
const reason = await controller.waitForClose();
575+
const reason = await controller
576+
.waitForClose()
577+
.finally(() => clearInterval(periodicDrainInterval));
555578
if (stopRequested() || sigintStop || reason === "aborted") {
556579
await controller.shutdown();
557580
break;

0 commit comments

Comments
 (0)