@@ -11,9 +11,12 @@ import { createWaSocket, formatError, getStatusCode, waitForWaConnection } from
1111import { resolveJidToE164 } from "../text-runtime.js" ;
1212import { checkInboundAccessControl } from "./access-control.js" ;
1313import {
14- isRecentInboundMessage ,
14+ claimRecentInboundMessage ,
15+ commitRecentInboundMessage ,
1516 isRecentOutboundMessage ,
17+ releaseRecentInboundMessage ,
1618 rememberRecentOutboundMessage ,
19+ WhatsAppRetryableInboundError ,
1720} from "./dedupe.js" ;
1821import {
1922 describeReplyContext ,
@@ -120,7 +123,26 @@ export async function attachWebInboxToSocket(
120123 options . authDir ,
121124 sock . user as { id ?: string | null ; lid ?: string | null } | undefined ,
122125 ) ;
123- const debouncer = createInboundDebouncer < WebInboundMessage > ( {
126+ type QueuedInboundMessage = WebInboundMessage & {
127+ dedupeKey ?: string ;
128+ } ;
129+
130+ const finalizeInboundDedupe = async (
131+ entries : QueuedInboundMessage [ ] ,
132+ error ?: unknown ,
133+ ) : Promise < void > => {
134+ const dedupeKeys = [ ...new Set ( entries . map ( ( entry ) => entry . dedupeKey ) . filter ( Boolean ) ) ] ;
135+ if ( dedupeKeys . length === 0 ) {
136+ return ;
137+ }
138+ if ( error instanceof WhatsAppRetryableInboundError ) {
139+ dedupeKeys . forEach ( ( dedupeKey ) => releaseRecentInboundMessage ( dedupeKey , error ) ) ;
140+ return ;
141+ }
142+ await Promise . all ( dedupeKeys . map ( ( dedupeKey ) => commitRecentInboundMessage ( dedupeKey ) ) ) ;
143+ } ;
144+
145+ const debouncer = createInboundDebouncer < QueuedInboundMessage > ( {
124146 debounceMs : options . debounceMs ?? 0 ,
125147 buildKey : ( msg ) => {
126148 const sender = msg . sender ;
@@ -144,27 +166,34 @@ export async function attachWebInboxToSocket(
144166 if ( ! last ) {
145167 return ;
146168 }
147- if ( entries . length === 1 ) {
148- await options . onMessage ( last ) ;
149- return ;
150- }
151- const mentioned = new Set < string > ( ) ;
152- for ( const entry of entries ) {
153- for ( const jid of entry . mentions ?? entry . mentionedJids ?? [ ] ) {
154- mentioned . add ( jid ) ;
169+ try {
170+ if ( entries . length === 1 ) {
171+ await options . onMessage ( last ) ;
172+ await finalizeInboundDedupe ( entries ) ;
173+ return ;
174+ }
175+ const mentioned = new Set < string > ( ) ;
176+ for ( const entry of entries ) {
177+ for ( const jid of entry . mentions ?? entry . mentionedJids ?? [ ] ) {
178+ mentioned . add ( jid ) ;
179+ }
155180 }
181+ const combinedBody = entries
182+ . map ( ( entry ) => entry . body )
183+ . filter ( Boolean )
184+ . join ( "\n" ) ;
185+ const combinedMessage : WebInboundMessage = {
186+ ...last ,
187+ body : combinedBody ,
188+ mentions : mentioned . size > 0 ? Array . from ( mentioned ) : undefined ,
189+ mentionedJids : mentioned . size > 0 ? Array . from ( mentioned ) : undefined ,
190+ } ;
191+ await options . onMessage ( combinedMessage ) ;
192+ await finalizeInboundDedupe ( entries ) ;
193+ } catch ( error ) {
194+ await finalizeInboundDedupe ( entries , error ) ;
195+ throw error ;
156196 }
157- const combinedBody = entries
158- . map ( ( entry ) => entry . body )
159- . filter ( Boolean )
160- . join ( "\n" ) ;
161- const combinedMessage : WebInboundMessage = {
162- ...last ,
163- body : combinedBody ,
164- mentions : mentioned . size > 0 ? Array . from ( mentioned ) : undefined ,
165- mentionedJids : mentioned . size > 0 ? Array . from ( mentioned ) : undefined ,
166- } ;
167- await options . onMessage ( combinedMessage ) ;
168197 } ,
169198 onError : ( err ) => {
170199 inboundLogger . error ( { error : String ( err ) } , "failed handling inbound web message" ) ;
@@ -306,12 +335,6 @@ export async function attachWebInboxToSocket(
306335 logVerbose ( `Skipping recent outbound WhatsApp echo ${ id } for ${ remoteJid } ` ) ;
307336 return null ;
308337 }
309- if ( id ) {
310- const dedupeKey = `${ options . accountId } :${ remoteJid } :${ id } ` ;
311- if ( isRecentInboundMessage ( dedupeKey ) ) {
312- return null ;
313- }
314- }
315338 const participantJid = msg . key ?. participant ?? undefined ;
316339 const from = group ? remoteJid : await resolveInboundJid ( remoteJid ) ;
317340 if ( ! from ) {
@@ -482,7 +505,7 @@ export async function attachWebInboxToSocket(
482505 } ,
483506 "inbound message" ,
484507 ) ;
485- const inboundMessage : WebInboundMessage = {
508+ const inboundMessage : QueuedInboundMessage = {
486509 id : inbound . id ,
487510 from : inbound . from ,
488511 conversationId : inbound . from ,
@@ -523,6 +546,7 @@ export async function attachWebInboxToSocket(
523546 mediaPath : enriched . mediaPath ,
524547 mediaType : enriched . mediaType ,
525548 mediaFileName : enriched . mediaFileName ,
549+ dedupeKey : inbound . id ? `${ options . accountId } :${ inbound . remoteJid } :${ inbound . id } ` : undefined ,
526550 } ;
527551 try {
528552 const task = Promise . resolve ( debouncer . enqueue ( inboundMessage ) ) ;
@@ -569,6 +593,11 @@ export async function attachWebInboxToSocket(
569593 continue ;
570594 }
571595
596+ const dedupeKey = inbound . id ? `${ options . accountId } :${ inbound . remoteJid } :${ inbound . id } ` : "" ;
597+ if ( dedupeKey && ! ( await claimRecentInboundMessage ( dedupeKey ) ) ) {
598+ continue ;
599+ }
600+
572601 await enqueueInboundMessage ( msg , inbound , enriched ) ;
573602 }
574603 } ;
0 commit comments