Skip to content

Commit d70e6b1

Browse files
committed
fix(whatsapp): make inbound retries explicit
1 parent fad06f7 commit d70e6b1

3 files changed

Lines changed: 110 additions & 30 deletions

File tree

extensions/whatsapp/src/inbound/dedupe.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { createDedupeCache } from "openclaw/plugin-sdk/core";
2+
import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
23

34
const RECENT_WEB_MESSAGE_TTL_MS = 20 * 60_000;
45
const RECENT_WEB_MESSAGE_MAX = 5000;
@@ -9,11 +10,22 @@ const recentInboundMessages = createDedupeCache({
910
ttlMs: RECENT_WEB_MESSAGE_TTL_MS,
1011
maxSize: RECENT_WEB_MESSAGE_MAX,
1112
});
13+
const claimableInboundMessages = createClaimableDedupe({
14+
ttlMs: RECENT_WEB_MESSAGE_TTL_MS,
15+
memoryMaxSize: RECENT_WEB_MESSAGE_MAX,
16+
});
1217
const recentOutboundMessages = createDedupeCache({
1318
ttlMs: RECENT_OUTBOUND_MESSAGE_TTL_MS,
1419
maxSize: RECENT_OUTBOUND_MESSAGE_MAX,
1520
});
1621

22+
export class WhatsAppRetryableInboundError extends Error {
23+
constructor(message: string, options?: ErrorOptions) {
24+
super(message, options);
25+
this.name = "WhatsAppRetryableInboundError";
26+
}
27+
}
28+
1729
function buildMessageKey(params: {
1830
accountId: string;
1931
remoteJid: string;
@@ -30,11 +42,22 @@ function buildMessageKey(params: {
3042

3143
export function resetWebInboundDedupe(): void {
3244
recentInboundMessages.clear();
45+
claimableInboundMessages.clearMemory();
3346
recentOutboundMessages.clear();
3447
}
3548

36-
export function isRecentInboundMessage(key: string): boolean {
37-
return recentInboundMessages.check(key);
49+
export async function claimRecentInboundMessage(key: string): Promise<boolean> {
50+
const claim = await claimableInboundMessages.claim(key);
51+
return claim.kind === "claimed";
52+
}
53+
54+
export async function commitRecentInboundMessage(key: string): Promise<void> {
55+
await claimableInboundMessages.commit(key);
56+
recentInboundMessages.check(key);
57+
}
58+
59+
export function releaseRecentInboundMessage(key: string, error?: unknown): void {
60+
claimableInboundMessages.release(key, { error });
3861
}
3962

4063
export function rememberRecentOutboundMessage(params: {

extensions/whatsapp/src/inbound/monitor.ts

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@ import { createWaSocket, formatError, getStatusCode, waitForWaConnection } from
1111
import { resolveJidToE164 } from "../text-runtime.js";
1212
import { checkInboundAccessControl } from "./access-control.js";
1313
import {
14-
isRecentInboundMessage,
14+
claimRecentInboundMessage,
15+
commitRecentInboundMessage,
1516
isRecentOutboundMessage,
17+
releaseRecentInboundMessage,
1618
rememberRecentOutboundMessage,
19+
WhatsAppRetryableInboundError,
1720
} from "./dedupe.js";
1821
import {
1922
describeReplyContext,
@@ -120,7 +123,26 @@ export async function attachWebInboxToSocket(
120123
options.authDir,
121124
sock.user as { id?: string | null; lid?: string | null } | undefined,
122125
);
123-
const debouncer = createInboundDebouncer<WebInboundMessage>({
126+
type QueuedInboundMessage = WebInboundMessage & {
127+
dedupeKey?: string;
128+
};
129+
130+
const finalizeInboundDedupe = async (
131+
entries: QueuedInboundMessage[],
132+
error?: unknown,
133+
): Promise<void> => {
134+
const dedupeKeys = [...new Set(entries.map((entry) => entry.dedupeKey).filter(Boolean))];
135+
if (dedupeKeys.length === 0) {
136+
return;
137+
}
138+
if (error instanceof WhatsAppRetryableInboundError) {
139+
dedupeKeys.forEach((dedupeKey) => releaseRecentInboundMessage(dedupeKey, error));
140+
return;
141+
}
142+
await Promise.all(dedupeKeys.map((dedupeKey) => commitRecentInboundMessage(dedupeKey)));
143+
};
144+
145+
const debouncer = createInboundDebouncer<QueuedInboundMessage>({
124146
debounceMs: options.debounceMs ?? 0,
125147
buildKey: (msg) => {
126148
const sender = msg.sender;
@@ -144,27 +166,34 @@ export async function attachWebInboxToSocket(
144166
if (!last) {
145167
return;
146168
}
147-
if (entries.length === 1) {
148-
await options.onMessage(last);
149-
return;
150-
}
151-
const mentioned = new Set<string>();
152-
for (const entry of entries) {
153-
for (const jid of entry.mentions ?? entry.mentionedJids ?? []) {
154-
mentioned.add(jid);
169+
try {
170+
if (entries.length === 1) {
171+
await options.onMessage(last);
172+
await finalizeInboundDedupe(entries);
173+
return;
174+
}
175+
const mentioned = new Set<string>();
176+
for (const entry of entries) {
177+
for (const jid of entry.mentions ?? entry.mentionedJids ?? []) {
178+
mentioned.add(jid);
179+
}
155180
}
181+
const combinedBody = entries
182+
.map((entry) => entry.body)
183+
.filter(Boolean)
184+
.join("\n");
185+
const combinedMessage: WebInboundMessage = {
186+
...last,
187+
body: combinedBody,
188+
mentions: mentioned.size > 0 ? Array.from(mentioned) : undefined,
189+
mentionedJids: mentioned.size > 0 ? Array.from(mentioned) : undefined,
190+
};
191+
await options.onMessage(combinedMessage);
192+
await finalizeInboundDedupe(entries);
193+
} catch (error) {
194+
await finalizeInboundDedupe(entries, error);
195+
throw error;
156196
}
157-
const combinedBody = entries
158-
.map((entry) => entry.body)
159-
.filter(Boolean)
160-
.join("\n");
161-
const combinedMessage: WebInboundMessage = {
162-
...last,
163-
body: combinedBody,
164-
mentions: mentioned.size > 0 ? Array.from(mentioned) : undefined,
165-
mentionedJids: mentioned.size > 0 ? Array.from(mentioned) : undefined,
166-
};
167-
await options.onMessage(combinedMessage);
168197
},
169198
onError: (err) => {
170199
inboundLogger.error({ error: String(err) }, "failed handling inbound web message");
@@ -306,12 +335,6 @@ export async function attachWebInboxToSocket(
306335
logVerbose(`Skipping recent outbound WhatsApp echo ${id} for ${remoteJid}`);
307336
return null;
308337
}
309-
if (id) {
310-
const dedupeKey = `${options.accountId}:${remoteJid}:${id}`;
311-
if (isRecentInboundMessage(dedupeKey)) {
312-
return null;
313-
}
314-
}
315338
const participantJid = msg.key?.participant ?? undefined;
316339
const from = group ? remoteJid : await resolveInboundJid(remoteJid);
317340
if (!from) {
@@ -482,7 +505,7 @@ export async function attachWebInboxToSocket(
482505
},
483506
"inbound message",
484507
);
485-
const inboundMessage: WebInboundMessage = {
508+
const inboundMessage: QueuedInboundMessage = {
486509
id: inbound.id,
487510
from: inbound.from,
488511
conversationId: inbound.from,
@@ -523,6 +546,7 @@ export async function attachWebInboxToSocket(
523546
mediaPath: enriched.mediaPath,
524547
mediaType: enriched.mediaType,
525548
mediaFileName: enriched.mediaFileName,
549+
dedupeKey: inbound.id ? `${options.accountId}:${inbound.remoteJid}:${inbound.id}` : undefined,
526550
};
527551
try {
528552
const task = Promise.resolve(debouncer.enqueue(inboundMessage));
@@ -569,6 +593,11 @@ export async function attachWebInboxToSocket(
569593
continue;
570594
}
571595

596+
const dedupeKey = inbound.id ? `${options.accountId}:${inbound.remoteJid}:${inbound.id}` : "";
597+
if (dedupeKey && !(await claimRecentInboundMessage(dedupeKey))) {
598+
continue;
599+
}
600+
572601
await enqueueInboundMessage(msg, inbound, enriched);
573602
}
574603
};

extensions/whatsapp/src/monitor-inbox.streams-inbound-messages.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import fsSync from "node:fs";
22
import path from "node:path";
33
import "./monitor-inbox.test-harness.js";
44
import { beforeEach, describe, expect, it, vi } from "vitest";
5+
import { WhatsAppRetryableInboundError } from "./inbound/dedupe.js";
56
import {
67
type InboxMonitorOptions,
78
InboxOnMessage,
@@ -458,6 +459,33 @@ describe("web monitor inbox", () => {
458459
await listener.close();
459460
});
460461

462+
it("retries redelivered messages after an explicit retryable inbound failure", async () => {
463+
let attempts = 0;
464+
const onMessage = vi.fn(async () => {
465+
attempts += 1;
466+
if (attempts === 1) {
467+
throw new WhatsAppRetryableInboundError("retry me");
468+
}
469+
});
470+
471+
const { listener, sock } = await startInboxMonitor(onMessage as InboxOnMessage);
472+
const upsert = buildNotifyMessageUpsert({
473+
id: nextMessageId("retryable-dedupe"),
474+
remoteJid: "999@s.whatsapp.net",
475+
text: "ping",
476+
timestamp: 1_700_000_000,
477+
pushName: "Tester",
478+
});
479+
480+
sock.ev.emit("messages.upsert", upsert);
481+
await waitForMessageCalls(onMessage, 1);
482+
483+
sock.ev.emit("messages.upsert", upsert);
484+
await waitForMessageCalls(onMessage, 2);
485+
486+
await listener.close();
487+
});
488+
461489
it("resolves LID JIDs using Baileys LID mapping store", async () => {
462490
const onMessage = vi.fn(async () => {
463491
return;

0 commit comments

Comments
 (0)