Skip to content
This repository was archived by the owner on May 5, 2026. It is now read-only.

Commit c94a870

Browse files
neeravmakwanasteipete
authored andcommitted
fix(outbound): hold active-delivery claim so reconnect drain skips live sends
Reconnect drain (drainPendingDeliveries) matches fresh pending entries by design to preserve crash-replay, but the live delivery path in deliverOutboundPayloads held no in-memory claim while the send was running. A reconnect firing mid-send therefore re-drove the same queue entry and produced duplicate outbound messages (e.g. WhatsApp cron sends going out 7-12x when the 30-minute inbound-silence watchdog fired during delivery). Claim the queueId against the existing entriesInProgress set right after enqueueDelivery and release it in the finally branch around ack/fail. Drain already skips claimed ids via claimRecoveryEntry, so no drain-side change is needed. The claim is process-local on purpose: a crashed owner leaves no claim behind, so startup recovery still reclaims orphaned entries. Fixes openclaw#70386. Made-with: Cursor
1 parent adda0dc commit c94a870

6 files changed

Lines changed: 74 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Docs: https://docs.openclaw.ai
3232
### Fixes
3333

3434
- Codex harness: route Codex-tagged MCP tool approval elicitations through OpenClaw plugin approvals, including current empty-schema app-server requests, while leaving generic user-input prompts fail-closed. (#68807) Thanks @kesslerio.
35+
- WhatsApp/outbound: hold an in-memory active-delivery claim while a live outbound send is in flight, so a concurrent reconnect drain no longer re-drives the same pending queue entry and duplicates cron sends 7-12x after the 30-minute inbound-silence watchdog fires mid-delivery. Crash-replay of fresh queue entries left behind by a dead process is preserved because the claim is intentionally process-local. Fixes #70386.
3536
- Providers/OpenAI: harden Voice Call realtime transcription against OpenAI Realtime session-update drift, forward language and prompt hints, and add live coverage for realtime STT.
3637
- Providers/Moonshot: stop strict-sanitizing Kimi's native tool_call IDs (shaped like `functions.<name>:<index>`) on the OpenAI-compatible transport, so multi-turn agentic flows through Kimi K2.6 no longer break after 2-3 tool-calling rounds when the serving layer fails to match mangled IDs against the original tool definitions. Adds a `sanitizeToolCallIds` opt-out to the shared `openai-compatible` replay family helper and wires Moonshot to it. Fixes #62319. (#70030) Thanks @LeoDu0314.
3738
- Dependencies/security: override transitive `uuid` to `14.0.0`, clearing the runtime advisory across dependencies.

src/infra/outbound/deliver.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const queueMocks = vi.hoisted(() => ({
3636
enqueueDelivery: vi.fn(async () => "mock-queue-id"),
3737
ackDelivery: vi.fn(async () => {}),
3838
failDelivery: vi.fn(async () => {}),
39+
tryClaimActiveDelivery: vi.fn<(entryId: string) => boolean>(() => true),
40+
releaseActiveDelivery: vi.fn<(entryId: string) => void>(() => {}),
3941
}));
4042
const logMocks = vi.hoisted(() => ({
4143
warn: vi.fn(),
@@ -70,6 +72,8 @@ vi.mock("./delivery-queue.js", () => ({
7072
enqueueDelivery: queueMocks.enqueueDelivery,
7173
ackDelivery: queueMocks.ackDelivery,
7274
failDelivery: queueMocks.failDelivery,
75+
tryClaimActiveDelivery: queueMocks.tryClaimActiveDelivery,
76+
releaseActiveDelivery: queueMocks.releaseActiveDelivery,
7377
}));
7478
vi.mock("../../logging/subsystem.js", () => ({
7579
createSubsystemLogger: () => {

src/infra/outbound/deliver.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,13 @@ import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js";
3636
import { formatErrorMessage } from "../errors.js";
3737
import { throwIfAborted } from "./abort.js";
3838
import type { OutboundDeliveryResult } from "./deliver-types.js";
39-
import { ackDelivery, enqueueDelivery, failDelivery } from "./delivery-queue.js";
39+
import {
40+
ackDelivery,
41+
enqueueDelivery,
42+
failDelivery,
43+
releaseActiveDelivery,
44+
tryClaimActiveDelivery,
45+
} from "./delivery-queue.js";
4046
import type { OutboundIdentity } from "./identity.js";
4147
import type { DeliveryMirror } from "./mirror.js";
4248
import {
@@ -660,6 +666,13 @@ export async function deliverOutboundPayloads(
660666
gatewayClientScopes: params.gatewayClientScopes,
661667
}).catch(() => null); // Best-effort — don't block delivery if queue write fails.
662668

669+
// Claim the queue entry against the shared in-memory recovery set so a
670+
// concurrent reconnect/startup drain skips this id while the live send is
671+
// still running. Without this, a reconnect during an in-flight delivery can
672+
// re-drive the same entry (retryCount=0, lastAttemptAt=undefined is drain-
673+
// eligible by design to preserve crash replay) and produce duplicates.
674+
const heldActiveClaim = queueId ? tryClaimActiveDelivery(queueId) : false;
675+
663676
// Wrap onError to detect partial failures under bestEffort mode.
664677
// When bestEffort is true, per-payload errors are caught and passed to onError
665678
// without throwing — so the outer try/catch never fires. We track whether any
@@ -694,6 +707,10 @@ export async function deliverOutboundPayloads(
694707
}
695708
}
696709
throw err;
710+
} finally {
711+
if (queueId && heldActiveClaim) {
712+
releaseActiveDelivery(queueId);
713+
}
697714
}
698715
}
699716

src/infra/outbound/delivery-queue-recovery.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,22 @@ function releaseRecoveryEntry(entryId: string): void {
9090
entriesInProgress.delete(entryId);
9191
}
9292

93+
/**
94+
* Claim an entry id against the shared in-memory recovery set so a concurrent
95+
* reconnect/startup drain will skip it while the owning caller is mid-flight.
96+
* Returns `false` if the id is already claimed. Callers must pair a successful
97+
* claim with {@link releaseActiveDelivery} in a `finally`. The claim is
98+
* process-local and intentionally does not survive a crash, so crash-replay
99+
* paths still recover fresh entries whose owning process died.
100+
*/
101+
export function tryClaimActiveDelivery(entryId: string): boolean {
102+
return claimRecoveryEntry(entryId);
103+
}
104+
105+
export function releaseActiveDelivery(entryId: string): void {
106+
releaseRecoveryEntry(entryId);
107+
}
108+
93109
function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) {
94110
return {
95111
cfg,

src/infra/outbound/delivery-queue.reconnect-drain.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import {
1010
MAX_RETRIES,
1111
type RecoveryLogger,
1212
recoverPendingDeliveries,
13+
releaseActiveDelivery,
14+
tryClaimActiveDelivery,
1315
} from "./delivery-queue.js";
1416
import {
1517
createRecoveryLog,
@@ -413,4 +415,35 @@ describe("drainPendingDeliveries for reconnect", () => {
413415

414416
expect(deliver).not.toHaveBeenCalled();
415417
});
418+
419+
it("skips entries that an in-flight live delivery has actively claimed", async () => {
420+
// Regression for openclaw/openclaw#70386: a reconnect drain that runs
421+
// while the live send is still writing to the adapter must not re-drive
422+
// the same entry. The live delivery path holds an in-memory active claim
423+
// for `queueId` across its send; drain honors that claim via the same
424+
// `entriesInProgress` set used for startup recovery.
425+
const log = createRecoveryLog();
426+
const deliver = vi.fn<DeliverFn>(async () => {});
427+
428+
const id = await enqueueDelivery(
429+
{ channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
430+
tmpDir,
431+
);
432+
433+
expect(tryClaimActiveDelivery(id)).toBe(true);
434+
try {
435+
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
436+
expect(deliver).not.toHaveBeenCalled();
437+
expect(log.info).toHaveBeenCalledWith(
438+
expect.stringContaining(`entry ${id} is already being recovered`),
439+
);
440+
} finally {
441+
releaseActiveDelivery(id);
442+
}
443+
444+
// Once the live delivery path releases its claim (success or failure), a
445+
// later reconnect drain is free to pick the entry up again.
446+
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
447+
expect(deliver).toHaveBeenCalledTimes(1);
448+
});
416449
});

src/infra/outbound/delivery-queue.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ export {
1515
isPermanentDeliveryError,
1616
MAX_RETRIES,
1717
recoverPendingDeliveries,
18+
releaseActiveDelivery,
19+
tryClaimActiveDelivery,
1820
} from "./delivery-queue-recovery.js";
1921
export type {
2022
DeliverFn,

0 commit comments

Comments
 (0)