1- import type { AnyMessageContent , proto , WAMessage } from "@whiskeysockets/baileys" ;
1+ import type { AnyMessageContent , proto , WAMessage , WASocket } from "@whiskeysockets/baileys" ;
22import { createInboundDebouncer , formatLocationText } from "openclaw/plugin-sdk/channel-inbound" ;
33import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime" ;
44import { logVerbose , shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env" ;
55import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env" ;
66import { getChildLogger } from "openclaw/plugin-sdk/text-runtime" ;
77import { readWebSelfIdentity } from "../auth-store.js" ;
88import { getPrimaryIdentityId , resolveComparableIdentity } from "../identity.js" ;
9- import { createWaSocket , getStatusCode , waitForWaConnection } from "../session.js" ;
9+ import { DEFAULT_RECONNECT_POLICY , computeBackoff , sleepWithAbort } from "../reconnect.js" ;
10+ import { createWaSocket , formatError , getStatusCode , waitForWaConnection } from "../session.js" ;
1011import { resolveJidToE164 } from "../text-runtime.js" ;
1112import { checkInboundAccessControl } from "./access-control.js" ;
1213import {
@@ -28,11 +29,20 @@ import { createWebSendApi } from "./send-api.js";
2829import type { WebInboundMessage , WebListenerCloseReason } from "./types.js" ;
2930
3031const LOGGED_OUT_STATUS = DisconnectReason ?. loggedOut ?? 401 ;
32+ const RECONNECT_IN_PROGRESS_ERROR = "no active socket - reconnection in progress" ;
3133
3234function isGroupJid ( jid : string ) : boolean {
3335 return ( typeof isJidGroup === "function" ? isJidGroup ( jid ) : jid . endsWith ( "@g.us" ) ) === true ;
3436}
3537
38+ function isRetryableSendDisconnectError ( err : unknown ) : boolean {
39+ return / c l o s e d | r e s e t | t i m e d \s * o u t | d i s c o n n e c t | n o a c t i v e s o c k e t / i. test ( formatError ( err ) ) ;
40+ }
41+
42+ function shouldClearSocketRefAfterSendFailure ( err : unknown ) : boolean {
43+ return / c l o s e d | r e s e t | d i s c o n n e c t | n o a c t i v e s o c k e t / i. test ( formatError ( err ) ) ;
44+ }
45+
3646export async function monitorWebInbox ( options : {
3747 verbose : boolean ;
3848 accountId : string ;
@@ -47,6 +57,20 @@ export async function monitorWebInbox(options: {
4757 debounceMs ?: number ;
4858 /** Optional debounce gating predicate. */
4959 shouldDebounce ?: ( msg : WebInboundMessage ) => boolean ;
60+ /** Optional shared socket reference so reply closures can follow reconnects. */
61+ socketRef ?: { current : WASocket | null } ;
62+ /** Whether send retries should wait for a reconnect. */
63+ shouldRetryDisconnect ?: ( ) => boolean ;
64+ /** Reconnect timing for waiting through transient socket replacement gaps. */
65+ disconnectRetryPolicy ?: {
66+ initialMs : number ;
67+ maxMs : number ;
68+ factor : number ;
69+ jitter : number ;
70+ maxAttempts : number ;
71+ } ;
72+ /** Abort in-flight reconnect waits when shutdown becomes terminal. */
73+ disconnectRetryAbortSignal ?: AbortSignal ;
5074} ) {
5175 const inboundLogger = getChildLogger ( { module : "web-inbound" } ) ;
5276 const inboundConsoleLog = createSubsystemLogger ( "gateway/channels/whatsapp" ) . child ( "inbound" ) ;
@@ -55,6 +79,16 @@ export async function monitorWebInbox(options: {
5579 } ) ;
5680 await waitForWaConnection ( sock ) ;
5781 const connectedAtMs = Date . now ( ) ;
82+ if ( options . socketRef ) {
83+ options . socketRef . current = sock ;
84+ }
85+ const getCurrentSock = ( ) => ( options . socketRef ? options . socketRef . current : sock ) ;
86+ const shouldRetryDisconnect = ( ) => options . shouldRetryDisconnect ?.( ) === true ;
87+ const disconnectRetryPolicy = options . disconnectRetryPolicy ?? DEFAULT_RECONNECT_POLICY ;
88+ const sendRetryMaxAttempts =
89+ disconnectRetryPolicy . maxAttempts > 0
90+ ? disconnectRetryPolicy . maxAttempts
91+ : DEFAULT_RECONNECT_POLICY . maxAttempts ;
5892
5993 let onCloseResolve : ( ( reason : WebListenerCloseReason ) => void ) | null = null ;
6094 const onClose = new Promise < WebListenerCloseReason > ( ( resolve ) => {
@@ -160,9 +194,43 @@ export async function monitorWebInbox(options: {
160194 } ;
161195
162196 const sendTrackedMessage = async ( jid : string , content : AnyMessageContent ) => {
163- const result = await sock . sendMessage ( jid , content ) ;
164- rememberOutboundMessage ( jid , result ) ;
165- return result ;
197+ let lastErr : unknown = new Error ( RECONNECT_IN_PROGRESS_ERROR ) ;
198+ for ( let attempt = 1 ; ; attempt ++ ) {
199+ const currentSock = getCurrentSock ( ) ;
200+ if ( currentSock ) {
201+ try {
202+ const result = await currentSock . sendMessage ( jid , content ) ;
203+ rememberOutboundMessage ( jid , result ) ;
204+ return result ;
205+ } catch ( err ) {
206+ if ( ! shouldRetryDisconnect ( ) || ! isRetryableSendDisconnectError ( err ) ) {
207+ throw err ;
208+ }
209+ lastErr = err ;
210+ if (
211+ shouldClearSocketRefAfterSendFailure ( err ) &&
212+ options . socketRef ?. current === currentSock
213+ ) {
214+ options . socketRef . current = null ;
215+ }
216+ }
217+ } else if ( ! shouldRetryDisconnect ( ) ) {
218+ throw lastErr ;
219+ }
220+
221+ if ( attempt >= sendRetryMaxAttempts ) {
222+ throw lastErr ;
223+ }
224+ const delayMs = computeBackoff ( disconnectRetryPolicy , attempt ) ;
225+ logVerbose (
226+ `Waiting ${ delayMs } ms for WhatsApp reconnect before retrying send to ${ jid } : ${ formatError ( lastErr ) } ` ,
227+ ) ;
228+ try {
229+ await sleepWithAbort ( delayMs , options . disconnectRetryAbortSignal ) ;
230+ } catch {
231+ throw lastErr ;
232+ }
233+ }
166234 } ;
167235
168236 const getGroupMeta = async ( jid : string ) => {
@@ -379,8 +447,12 @@ export async function monitorWebInbox(options: {
379447 ) => {
380448 const chatJid = inbound . remoteJid ;
381449 const sendComposing = async ( ) => {
450+ const currentSock = getCurrentSock ( ) ;
451+ if ( ! currentSock ) {
452+ return ;
453+ }
382454 try {
383- await sock . sendPresenceUpdate ( "composing" , chatJid ) ;
455+ await currentSock . sendPresenceUpdate ( "composing" , chatJid ) ;
384456 } catch ( err ) {
385457 logVerbose ( `Presence update failed: ${ String ( err ) } ` ) ;
386458 }
@@ -502,6 +574,9 @@ export async function monitorWebInbox(options: {
502574 ) => {
503575 try {
504576 if ( update . connection === "close" ) {
577+ if ( options . socketRef ?. current === sock ) {
578+ options . socketRef . current = null ;
579+ }
505580 const status = getStatusCode ( update . lastDisconnect ?. error ) ;
506581 resolveClose ( {
507582 status,
@@ -550,7 +625,13 @@ export async function monitorWebInbox(options: {
550625 const sendApi = createWebSendApi ( {
551626 sock : {
552627 sendMessage : ( jid : string , content : AnyMessageContent ) => sendTrackedMessage ( jid , content ) ,
553- sendPresenceUpdate : ( presence , jid ?: string ) => sock . sendPresenceUpdate ( presence , jid ) ,
628+ sendPresenceUpdate : async ( presence , jid ?: string ) => {
629+ const currentSock = getCurrentSock ( ) ;
630+ if ( ! currentSock ) {
631+ throw new Error ( RECONNECT_IN_PROGRESS_ERROR ) ;
632+ }
633+ return currentSock . sendPresenceUpdate ( presence , jid ) ;
634+ } ,
554635 } ,
555636 defaultAccountId : options . accountId ,
556637 } ) ;
0 commit comments