Skip to content

Commit 91f45d9

Browse files
udaymanish6Miya
andauthored
fix(gateway): dedupe exec followup continuations (#82717)
Co-authored-by: Miya <miya@Miyas-Mac-mini.local>
1 parent 842e6f1 commit 91f45d9

7 files changed

Lines changed: 1209 additions & 725 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
2727
- CLI/config: show concise human config-write output with an indented backup path instead of printing checksum-heavy overwrite audit details by default.
2828
- CLI/docs: call the canonical lowercase docs MCP search tool and surface MCP errors instead of returning empty search results. Fixes #82702. (#82704) Thanks @hclsys.
2929
- QA-Lab: ignore heartbeat-only operational transcripts when capturing runtime parity cells so background checks cannot replace the scenario reply. (#80323) Thanks @100yenadmin.
30+
- Gateway/exec approvals: wait for accepted async approval follow-up runs instead of direct-fallback sending duplicate completions when retries use different nonce keys. Fixes #82711. (#82717) Thanks @udaymanish6.
3031
- CLI/config: add `--dry-run` support to `openclaw config unset`, with `--json` output and allow-exec validation parity with `config set`/`config patch` dry-run handling. (#81895) Thanks @giodl73-repo.
3132
- Memory-core: retry disabled dreaming cron cleanup until cron is available after startup, so persisted managed dreaming jobs are removed after restart. Fixes #82383. (#82389) Thanks @neeravmakwana.
3233
- Providers/xAI: keep retired Grok 3, Grok 4 Fast, Grok 4.1 Fast, and Grok Code slugs out of model pickers while preserving compatibility resolution for existing configs.

src/agents/bash-tools.exec-approval-followup.test.ts

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { afterEach, describe, expect, it, vi } from "vitest";
22

33
vi.mock("./tools/gateway.js", () => ({
4-
callGatewayTool: vi.fn(async () => ({ ok: true })),
4+
callGatewayTool: vi.fn(async () => ({ status: "ok" })),
55
}));
66

77
vi.mock("../infra/outbound/message.js", () => ({
@@ -42,7 +42,22 @@ function expectGatewayAgentFollowup(expected: Record<string, unknown>) {
4242
for (const [key, value] of Object.entries(expected)) {
4343
expect(params[key]).toBe(value);
4444
}
45-
expect(call[3]).toEqual({ expectFinal: true });
45+
expect(call[3]).toBeUndefined();
46+
return params;
47+
}
48+
49+
function expectGatewayAgentWait(expected: Record<string, unknown>) {
50+
const call = (callGatewayTool as { mock?: { calls?: unknown[][] } }).mock?.calls?.[1];
51+
if (!call) {
52+
throw new Error("expected agent.wait call");
53+
}
54+
expect(call[0]).toBe("agent.wait");
55+
requireRecord(call[1], "gateway wait context");
56+
const params = requireRecord(call[2], "gateway wait params");
57+
for (const [key, value] of Object.entries(expected)) {
58+
expect(params[key]).toBe(value);
59+
}
60+
expect(call[3]).toBeUndefined();
4661
return params;
4762
}
4863

@@ -134,6 +149,41 @@ describe("exec approval followup", () => {
134149
expect(sendMessage).not.toHaveBeenCalled();
135150
});
136151

152+
it("waits for accepted agent followups without direct fallback", async () => {
153+
vi.mocked(callGatewayTool)
154+
.mockResolvedValueOnce({
155+
runId: "exec-approval-followup:req-wait:nonce:nonce-wait",
156+
status: "accepted",
157+
})
158+
.mockResolvedValueOnce({
159+
runId: "exec-approval-followup:req-wait:nonce:nonce-wait",
160+
status: "ok",
161+
});
162+
163+
await sendExecApprovalFollowup({
164+
approvalId: "req-wait",
165+
sessionKey: "agent:main:telegram:direct:123",
166+
turnSourceChannel: "telegram",
167+
turnSourceTo: "123",
168+
turnSourceAccountId: "default",
169+
resultText: "Exec finished (gateway id=req-wait, session=sess_1, code 0)\nall good",
170+
idempotencyKey: "exec-approval-followup:req-wait:nonce:nonce-wait",
171+
});
172+
173+
expectGatewayAgentFollowup({
174+
sessionKey: "agent:main:telegram:direct:123",
175+
deliver: true,
176+
channel: "telegram",
177+
to: "123",
178+
idempotencyKey: "exec-approval-followup:req-wait:nonce:nonce-wait",
179+
});
180+
expectGatewayAgentWait({
181+
runId: "exec-approval-followup:req-wait:nonce:nonce-wait",
182+
timeoutMs: 60_000,
183+
});
184+
expect(sendMessage).not.toHaveBeenCalled();
185+
});
186+
137187
it("falls back to sanitized direct external delivery only when no session exists", async () => {
138188
await sendExecApprovalFollowup({
139189
approvalId: "req-no-session",

src/agents/bash-tools.exec-approval-followup.ts

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import {
44
} from "../infra/outbound/best-effort-delivery.js";
55
import { sendMessage } from "../infra/outbound/message.js";
66
import { isCronSessionKey, isSubagentSessionKey } from "../sessions/session-key-utils.js";
7-
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
7+
import {
8+
normalizeLowercaseStringOrEmpty,
9+
normalizeOptionalString,
10+
} from "../shared/string-coerce.js";
811
import { isGatewayMessageChannel, normalizeMessageChannel } from "../utils/message-channel.js";
912
import { buildExecApprovalFollowupIdempotencyKey } from "./bash-tools.exec-approval-followup-state.js";
1013
import {
@@ -124,6 +127,51 @@ function buildSessionResumeFallbackPrefix(): string {
124127
return "Automatic session resume failed, so sending the status directly.\n\n";
125128
}
126129

130+
function readGatewayStatus(value: unknown): string | undefined {
131+
return value && typeof value === "object" && !Array.isArray(value)
132+
? normalizeOptionalString((value as { status?: unknown }).status)
133+
: undefined;
134+
}
135+
136+
function readGatewayRunId(value: unknown): string | undefined {
137+
return value && typeof value === "object" && !Array.isArray(value)
138+
? normalizeOptionalString((value as { runId?: unknown }).runId)
139+
: undefined;
140+
}
141+
142+
function buildFollowupWaitError(params: { status?: string; error?: unknown }): Error {
143+
const suffix =
144+
typeof params.error === "string" && params.error.trim()
145+
? `: ${params.error.trim()}`
146+
: params.status
147+
? `: ${params.status}`
148+
: "";
149+
return new Error(`exec approval followup session resume failed${suffix}`);
150+
}
151+
152+
function isSuccessfulFollowupStatus(status: string | undefined): boolean {
153+
return status === "ok";
154+
}
155+
156+
async function waitForAgentFollowupRun(params: {
157+
runId: string;
158+
timeoutMs: number;
159+
}): Promise<void> {
160+
const wait = await callGatewayTool(
161+
"agent.wait",
162+
{ timeoutMs: params.timeoutMs + 2_000 },
163+
{
164+
runId: params.runId,
165+
timeoutMs: params.timeoutMs,
166+
},
167+
);
168+
const status = readGatewayStatus(wait);
169+
if (isSuccessfulFollowupStatus(status)) {
170+
return;
171+
}
172+
throw buildFollowupWaitError({ status, error: wait.error });
173+
}
174+
127175
function shouldPrefixDirectFollowupWithSessionResumeFailure(params: {
128176
resultText: string;
129177
sessionError: unknown;
@@ -249,25 +297,34 @@ export async function sendExecApprovalFollowup(
249297

250298
if (sessionKey && params.direct !== true) {
251299
try {
252-
await callGatewayTool(
253-
"agent",
254-
{ timeoutMs: 60_000 },
255-
buildAgentFollowupArgs({
256-
approvalId: params.approvalId,
257-
sessionKey,
258-
resultText,
259-
deliveryTarget,
260-
sessionOnlyOriginChannel,
261-
turnSourceChannel: params.turnSourceChannel,
262-
turnSourceTo: params.turnSourceTo,
263-
turnSourceAccountId: params.turnSourceAccountId,
264-
turnSourceThreadId: params.turnSourceThreadId,
265-
internalRuntimeHandoffId: params.internalRuntimeHandoffId,
266-
idempotencyKey: params.idempotencyKey,
267-
}),
268-
{ expectFinal: true },
269-
);
270-
return true;
300+
const agentArgs = buildAgentFollowupArgs({
301+
approvalId: params.approvalId,
302+
sessionKey,
303+
resultText,
304+
deliveryTarget,
305+
sessionOnlyOriginChannel,
306+
turnSourceChannel: params.turnSourceChannel,
307+
turnSourceTo: params.turnSourceTo,
308+
turnSourceAccountId: params.turnSourceAccountId,
309+
turnSourceThreadId: params.turnSourceThreadId,
310+
internalRuntimeHandoffId: params.internalRuntimeHandoffId,
311+
idempotencyKey: params.idempotencyKey,
312+
});
313+
const accepted = await callGatewayTool("agent", { timeoutMs: 60_000 }, agentArgs);
314+
const status = readGatewayStatus(accepted);
315+
if (isSuccessfulFollowupStatus(status)) {
316+
return true;
317+
}
318+
if (status === "accepted" || status === "in_flight" || status === "pending") {
319+
const runId =
320+
readGatewayRunId(accepted) ?? normalizeOptionalString(agentArgs.idempotencyKey);
321+
if (!runId) {
322+
throw buildFollowupWaitError({ status: "missing-run-id" });
323+
}
324+
await waitForAgentFollowupRun({ runId, timeoutMs: 60_000 });
325+
return true;
326+
}
327+
throw buildFollowupWaitError({ status, error: accepted.error });
271328
} catch (err) {
272329
sessionError = err;
273330
}

src/gateway/server-maintenance.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,35 @@ describe("startGatewayMaintenanceTimers", () => {
306306
stopMaintenanceTimers(timers);
307307
});
308308

309+
it("keeps active exec approval dedupe aliases past the normal ttl", async () => {
310+
vi.useFakeTimers();
311+
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));
312+
const { startGatewayMaintenanceTimers } = await import("./server-maintenance.js");
313+
const deps = createMaintenanceTimerDeps();
314+
const now = Date.now();
315+
const runId = "exec-approval-followup:req-active:nonce:retry-1";
316+
deps.chatAbortControllers.set(runId, createActiveRun("agent:main:main", "agent"));
317+
deps.dedupe.set("agent:exec-approval-followup:req-active", {
318+
ts: now - DEDUPE_TTL_MS - 1,
319+
ok: true,
320+
payload: { runId, status: "accepted" },
321+
});
322+
deps.dedupe.set("agent:exec-approval-followup:req-stale", {
323+
ts: now - DEDUPE_TTL_MS - 1,
324+
ok: true,
325+
payload: { runId: "exec-approval-followup:req-stale:nonce:retry-1", status: "accepted" },
326+
});
327+
328+
const timers = startGatewayMaintenanceTimers(deps);
329+
330+
await vi.advanceTimersByTimeAsync(60_000);
331+
332+
expect(deps.dedupe.has("agent:exec-approval-followup:req-active")).toBe(true);
333+
expect(deps.dedupe.has("agent:exec-approval-followup:req-stale")).toBe(false);
334+
335+
stopMaintenanceTimers(timers);
336+
});
337+
309338
it("evicts dedupe overflow by oldest timestamp even after reinsertion", async () => {
310339
vi.useFakeTimers();
311340
vi.setSystemTime(new Date("2026-03-22T00:00:00Z"));

src/gateway/server-maintenance.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,19 +88,37 @@ export function startGatewayMaintenanceTimers(params: {
8888
const dedupeCleanup = setInterval(() => {
8989
const AGENT_RUN_SEQ_MAX = 10_000;
9090
const now = Date.now();
91-
const isActiveRunDedupeKey = (key: string) => {
91+
const resolveDedupeRunId = (key: string, entry: DedupeEntry) => {
92+
if (!key.startsWith("agent:") && !key.startsWith("chat:")) {
93+
return undefined;
94+
}
95+
const keyRunId = key.slice(key.indexOf(":") + 1);
96+
if (keyRunId) {
97+
const directEntry = params.chatAbortControllers.get(keyRunId);
98+
if (directEntry) {
99+
return keyRunId;
100+
}
101+
}
102+
const payload = entry.payload;
103+
return payload && typeof payload === "object" && !Array.isArray(payload)
104+
? typeof (payload as { runId?: unknown }).runId === "string"
105+
? (payload as { runId: string }).runId.trim() || undefined
106+
: undefined
107+
: undefined;
108+
};
109+
const isActiveRunDedupeKey = (key: string, dedupeEntry: DedupeEntry) => {
92110
if (!key.startsWith("agent:") && !key.startsWith("chat:")) {
93111
return false;
94112
}
95-
const runId = key.slice(key.indexOf(":") + 1);
113+
const runId = resolveDedupeRunId(key, dedupeEntry);
96114
const entry = runId ? params.chatAbortControllers.get(runId) : undefined;
97115
if (!entry) {
98116
return false;
99117
}
100118
return key.startsWith("agent:") ? entry.kind === "agent" : entry.kind !== "agent";
101119
};
102120
for (const [k, v] of params.dedupe) {
103-
if (isActiveRunDedupeKey(k)) {
121+
if (isActiveRunDedupeKey(k, v)) {
104122
continue;
105123
}
106124
if (now - v.ts > DEDUPE_TTL_MS) {
@@ -110,7 +128,7 @@ export function startGatewayMaintenanceTimers(params: {
110128
if (params.dedupe.size > DEDUPE_MAX) {
111129
const excess = params.dedupe.size - DEDUPE_MAX;
112130
const oldestKeys = [...params.dedupe.entries()]
113-
.filter(([key]) => !isActiveRunDedupeKey(key))
131+
.filter(([key, entry]) => !isActiveRunDedupeKey(key, entry))
114132
.toSorted(([, left], [, right]) => left.ts - right.ts)
115133
.slice(0, excess)
116134
.map(([key]) => key);

0 commit comments

Comments
 (0)