@@ -3,6 +3,7 @@ import { formatErrorMessage } from "../errors.js";
33import {
44 ackDelivery ,
55 failDelivery ,
6+ loadPendingDelivery ,
67 loadPendingDeliveries ,
78 moveToFailed ,
89 type QueuedDelivery ,
@@ -30,6 +31,11 @@ export interface RecoveryLogger {
3031 error ( msg : string ) : void ;
3132}
3233
34+ export interface PendingDeliveryDrainDecision {
35+ match : boolean ;
36+ bypassBackoff ?: boolean ;
37+ }
38+
3339const MAX_RETRIES = 5 ;
3440
3541/** Backoff delays in milliseconds indexed by retry count (1-based). */
@@ -54,8 +60,6 @@ const PERMANENT_ERROR_PATTERNS: readonly RegExp[] = [
5460 / U s e r .* n o t i n r o o m / i,
5561] ;
5662
57- const NO_LISTENER_ERROR_RE = / N o a c t i v e W h a t s A p p W e b l i s t e n e r / i;
58-
5963const drainInProgress = new Map < string , boolean > ( ) ;
6064const entriesInProgress = new Set < string > ( ) ;
6165
@@ -68,10 +72,6 @@ function loadDeliverRuntime() {
6872 return deliverRuntimePromise ;
6973}
7074
71- function normalizeQueueAccountId ( accountId ?: string ) : string {
72- return ( accountId ?? "" ) . trim ( ) || "default" ;
73- }
74-
7575function getErrnoCode ( err : unknown ) : string | null {
7676 return err && typeof err === "object" && "code" in err
7777 ? String ( ( err as { code ?: unknown } ) . code )
@@ -179,82 +179,151 @@ export function isPermanentDeliveryError(error: string): boolean {
179179 return PERMANENT_ERROR_PATTERNS . some ( ( re ) => re . test ( error ) ) ;
180180}
181181
182- export async function drainReconnectQueue ( opts : {
183- accountId : string ;
182+ async function drainQueuedEntry ( opts : {
183+ entry : QueuedDelivery ;
184+ cfg : OpenClawConfig ;
185+ deliver : DeliverFn ;
186+ stateDir ?: string ;
187+ onRecovered ?: ( entry : QueuedDelivery ) => void ;
188+ onFailed ?: ( entry : QueuedDelivery , errMsg : string ) => void ;
189+ } ) : Promise < "recovered" | "failed" | "moved-to-failed" | "already-gone" > {
190+ const { entry } = opts ;
191+ try {
192+ await opts . deliver ( buildRecoveryDeliverParams ( entry , opts . cfg ) ) ;
193+ await ackDelivery ( entry . id , opts . stateDir ) ;
194+ opts . onRecovered ?.( entry ) ;
195+ return "recovered" ;
196+ } catch ( err ) {
197+ const errMsg = formatErrorMessage ( err ) ;
198+ opts . onFailed ?.( entry , errMsg ) ;
199+ if ( isPermanentDeliveryError ( errMsg ) ) {
200+ try {
201+ await moveToFailed ( entry . id , opts . stateDir ) ;
202+ return "moved-to-failed" ;
203+ } catch ( moveErr ) {
204+ if ( getErrnoCode ( moveErr ) === "ENOENT" ) {
205+ return "already-gone" ;
206+ }
207+ }
208+ } else {
209+ try {
210+ await failDelivery ( entry . id , errMsg , opts . stateDir ) ;
211+ return "failed" ;
212+ } catch ( failErr ) {
213+ if ( getErrnoCode ( failErr ) === "ENOENT" ) {
214+ return "already-gone" ;
215+ }
216+ }
217+ }
218+ return "failed" ;
219+ }
220+ }
221+
222+ export async function drainPendingDeliveries ( opts : {
223+ drainKey : string ;
224+ logLabel : string ;
184225 cfg : OpenClawConfig ;
185226 log : RecoveryLogger ;
186227 stateDir ?: string ;
187228 deliver ?: DeliverFn ;
229+ selectEntry : ( entry : QueuedDelivery , now : number ) => PendingDeliveryDrainDecision ;
188230} ) : Promise < void > {
189- if ( drainInProgress . get ( opts . accountId ) ) {
190- opts . log . info (
191- `WhatsApp reconnect drain: already in progress for account ${ opts . accountId } , skipping` ,
192- ) ;
231+ if ( drainInProgress . get ( opts . drainKey ) ) {
232+ opts . log . info ( `${ opts . logLabel } : already in progress for ${ opts . drainKey } , skipping` ) ;
193233 return ;
194234 }
195235
196- drainInProgress . set ( opts . accountId , true ) ;
236+ drainInProgress . set ( opts . drainKey , true ) ;
197237 try {
238+ const now = Date . now ( ) ;
239+ const deliver = opts . deliver ?? ( await loadDeliverRuntime ( ) ) . deliverOutboundPayloads ;
198240 const matchingEntries = ( await loadPendingDeliveries ( opts . stateDir ) )
241+ . map ( ( entry ) => ( {
242+ entry,
243+ decision : opts . selectEntry ( entry , now ) ,
244+ } ) )
199245 . filter (
200- ( entry ) =>
201- entry . channel === "whatsapp" &&
202- normalizeQueueAccountId ( entry . accountId ) === opts . accountId &&
203- typeof entry . lastError === "string" &&
204- NO_LISTENER_ERROR_RE . test ( entry . lastError ) ,
246+ ( item ) : item is { entry : QueuedDelivery ; decision : PendingDeliveryDrainDecision } =>
247+ item . decision . match ,
205248 )
206- . toSorted ( ( a , b ) => a . enqueuedAt - b . enqueuedAt ) ;
249+ . toSorted ( ( a , b ) => a . entry . enqueuedAt - b . entry . enqueuedAt ) ;
207250
208251 if ( matchingEntries . length === 0 ) {
209252 return ;
210253 }
211254
212255 opts . log . info (
213- `WhatsApp reconnect drain : ${ matchingEntries . length } pending message(s) for account ${ opts . accountId } ` ,
256+ `${ opts . logLabel } : ${ matchingEntries . length } pending message(s) matched ${ opts . drainKey } ` ,
214257 ) ;
215258
216- const deliver = opts . deliver ?? ( await loadDeliverRuntime ( ) ) . deliverOutboundPayloads ;
217-
218- for ( const entry of matchingEntries ) {
259+ for ( const { entry, decision } of matchingEntries ) {
219260 if ( ! claimRecoveryEntry ( entry . id ) ) {
220- opts . log . info ( `WhatsApp reconnect drain : entry ${ entry . id } is already being recovered` ) ;
261+ opts . log . info ( `${ opts . logLabel } : entry ${ entry . id } is already being recovered` ) ;
221262 continue ;
222263 }
223264
224- if ( entry . retryCount >= MAX_RETRIES ) {
225- try {
226- await moveToFailed ( entry . id , opts . stateDir ) ;
227- } catch ( err ) {
228- if ( getErrnoCode ( err ) === "ENOENT" ) {
229- opts . log . info ( `reconnect drain: entry ${ entry . id } already gone, skipping` ) ;
265+ try {
266+ // Re-read after claim so the queue file remains the source of truth.
267+ // This prevents stale startup/reconnect snapshots from re-sending an
268+ // entry that another recovery path already acked.
269+ const currentEntry = await loadPendingDelivery ( entry . id , opts . stateDir ) ;
270+ if ( ! currentEntry ) {
271+ opts . log . info ( `${ opts . logLabel } : entry ${ entry . id } already gone, skipping` ) ;
272+ continue ;
273+ }
274+
275+ if ( currentEntry . retryCount >= MAX_RETRIES ) {
276+ try {
277+ await moveToFailed ( currentEntry . id , opts . stateDir ) ;
278+ } catch ( err ) {
279+ if ( getErrnoCode ( err ) === "ENOENT" ) {
280+ opts . log . info ( `${ opts . logLabel } : entry ${ currentEntry . id } already gone, skipping` ) ;
281+ continue ;
282+ }
283+ throw err ;
284+ }
285+ opts . log . warn (
286+ `${ opts . logLabel } : entry ${ currentEntry . id } exceeded max retries and was moved to failed/` ,
287+ ) ;
288+ continue ;
289+ }
290+
291+ if ( ! decision . bypassBackoff ) {
292+ const retryEligibility = isEntryEligibleForRecoveryRetry ( currentEntry , Date . now ( ) ) ;
293+ if ( ! retryEligibility . eligible ) {
294+ opts . log . info (
295+ `${ opts . logLabel } : entry ${ currentEntry . id } not ready for retry yet — backoff ${ retryEligibility . remainingBackoffMs } ms remaining` ,
296+ ) ;
230297 continue ;
231298 }
232- throw err ;
233- } finally {
234- releaseRecoveryEntry ( entry . id ) ;
235299 }
236- opts . log . warn (
237- `WhatsApp reconnect drain: entry ${ entry . id } exceeded max retries and was moved to failed/` ,
238- ) ;
239- continue ;
240- }
241300
242- try {
243- await deliver ( buildRecoveryDeliverParams ( entry , opts . cfg ) ) ;
244- await ackDelivery ( entry . id , opts . stateDir ) ;
245- } catch ( err ) {
246- const errMsg = formatErrorMessage ( err ) ;
247- if ( isPermanentDeliveryError ( errMsg ) ) {
248- await moveToFailed ( entry . id , opts . stateDir ) . catch ( ( ) => { } ) ;
249- } else {
250- await failDelivery ( entry . id , errMsg , opts . stateDir ) . catch ( ( ) => { } ) ;
301+ const result = await drainQueuedEntry ( {
302+ entry : currentEntry ,
303+ cfg : opts . cfg ,
304+ deliver,
305+ stateDir : opts . stateDir ,
306+ onFailed : ( failedEntry , errMsg ) => {
307+ if ( isPermanentDeliveryError ( errMsg ) ) {
308+ opts . log . warn (
309+ `${ opts . logLabel } : entry ${ failedEntry . id } hit permanent error — moving to failed/: ${ errMsg } ` ,
310+ ) ;
311+ return ;
312+ }
313+ opts . log . warn ( `${ opts . logLabel } : retry failed for entry ${ failedEntry . id } : ${ errMsg } ` ) ;
314+ } ,
315+ } ) ;
316+ if ( result === "recovered" ) {
317+ opts . log . info (
318+ `${ opts . logLabel } : drained delivery ${ currentEntry . id } on ${ currentEntry . channel } ` ,
319+ ) ;
251320 }
252321 } finally {
253322 releaseRecoveryEntry ( entry . id ) ;
254323 }
255324 }
256325 } finally {
257- drainInProgress . delete ( opts . accountId ) ;
326+ drainInProgress . delete ( opts . drainKey ) ;
258327 }
259328}
260329
@@ -289,57 +358,60 @@ export async function recoverPendingDeliveries(opts: {
289358 await deferRemainingEntriesForBudget ( pending . slice ( i ) , opts . stateDir ) ;
290359 break ;
291360 }
292- if ( entry . retryCount >= MAX_RETRIES ) {
293- if ( ! claimRecoveryEntry ( entry . id ) ) {
294- opts . log . info ( `Recovery skipped for delivery ${ entry . id } : already being processed` ) ;
295- continue ;
296- }
297- try {
298- opts . log . warn (
299- `Delivery ${ entry . id } exceeded max retries (${ entry . retryCount } /${ MAX_RETRIES } ) — moving to failed/` ,
300- ) ;
301- await moveEntryToFailedWithLogging ( entry . id , opts . log , opts . stateDir ) ;
302- summary . skippedMaxRetries += 1 ;
303- } finally {
304- releaseRecoveryEntry ( entry . id ) ;
305- }
306- continue ;
307- }
308-
309- const retryEligibility = isEntryEligibleForRecoveryRetry ( entry , now ) ;
310- if ( ! retryEligibility . eligible ) {
311- summary . deferredBackoff += 1 ;
312- opts . log . info (
313- `Delivery ${ entry . id } not ready for retry yet — backoff ${ retryEligibility . remainingBackoffMs } ms remaining` ,
314- ) ;
315- continue ;
316- }
317361
318362 if ( ! claimRecoveryEntry ( entry . id ) ) {
319363 opts . log . info ( `Recovery skipped for delivery ${ entry . id } : already being processed` ) ;
320364 continue ;
321365 }
322366
323367 try {
324- await opts . deliver ( buildRecoveryDeliverParams ( entry , opts . cfg ) ) ;
325- await ackDelivery ( entry . id , opts . stateDir ) ;
326- summary . recovered += 1 ;
327- opts . log . info ( `Recovered delivery ${ entry . id } to ${ entry . channel } :${ entry . to } ` ) ;
328- } catch ( err ) {
329- const errMsg = formatErrorMessage ( err ) ;
330- if ( isPermanentDeliveryError ( errMsg ) ) {
331- opts . log . warn ( `Delivery ${ entry . id } hit permanent error — moving to failed/: ${ errMsg } ` ) ;
332- await moveEntryToFailedWithLogging ( entry . id , opts . log , opts . stateDir ) ;
333- summary . failed += 1 ;
368+ const currentEntry = await loadPendingDelivery ( entry . id , opts . stateDir ) ;
369+ if ( ! currentEntry ) {
370+ opts . log . info ( `Recovery skipped for delivery ${ entry . id } : already gone` ) ;
334371 continue ;
335372 }
336- try {
337- await failDelivery ( entry . id , errMsg , opts . stateDir ) ;
338- } catch {
339- // Best-effort update.
373+
374+ if ( currentEntry . retryCount >= MAX_RETRIES ) {
375+ opts . log . warn (
376+ `Delivery ${ currentEntry . id } exceeded max retries (${ currentEntry . retryCount } /${ MAX_RETRIES } ) — moving to failed/` ,
377+ ) ;
378+ await moveEntryToFailedWithLogging ( currentEntry . id , opts . log , opts . stateDir ) ;
379+ summary . skippedMaxRetries += 1 ;
380+ continue ;
381+ }
382+
383+ const currentRetryEligibility = isEntryEligibleForRecoveryRetry ( currentEntry , Date . now ( ) ) ;
384+ if ( ! currentRetryEligibility . eligible ) {
385+ summary . deferredBackoff += 1 ;
386+ opts . log . info (
387+ `Delivery ${ currentEntry . id } not ready for retry yet — backoff ${ currentRetryEligibility . remainingBackoffMs } ms remaining` ,
388+ ) ;
389+ continue ;
390+ }
391+
392+ const result = await drainQueuedEntry ( {
393+ entry : currentEntry ,
394+ cfg : opts . cfg ,
395+ deliver : opts . deliver ,
396+ stateDir : opts . stateDir ,
397+ onRecovered : ( recoveredEntry ) => {
398+ summary . recovered += 1 ;
399+ opts . log . info ( `Recovered delivery ${ recoveredEntry . id } on ${ recoveredEntry . channel } ` ) ;
400+ } ,
401+ onFailed : ( failedEntry , errMsg ) => {
402+ summary . failed += 1 ;
403+ if ( isPermanentDeliveryError ( errMsg ) ) {
404+ opts . log . warn (
405+ `Delivery ${ failedEntry . id } hit permanent error — moving to failed/: ${ errMsg } ` ,
406+ ) ;
407+ return ;
408+ }
409+ opts . log . warn ( `Retry failed for delivery ${ failedEntry . id } : ${ errMsg } ` ) ;
410+ } ,
411+ } ) ;
412+ if ( result === "moved-to-failed" ) {
413+ continue ;
340414 }
341- summary . failed += 1 ;
342- opts . log . warn ( `Retry failed for delivery ${ entry . id } : ${ errMsg } ` ) ;
343415 } finally {
344416 releaseRecoveryEntry ( entry . id ) ;
345417 }
0 commit comments