Skip to content

Commit ff79299

Browse files
committed
fix(agent): retry transient gateway handshake closes
1 parent 8523e09 commit ff79299

3 files changed

Lines changed: 156 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai
2828

2929
### Fixes
3030

31+
- CLI/agents: retry transient normal-close Gateway handshakes before falling back to embedded `openclaw agent` execution.
3132
- Providers/Gemini: strip fractional seconds from web-search time range filters so Gemini accepts freshness-bound search requests. (#85071) Thanks @Noerr.
3233
- OpenAI Codex: preserve image input support for sparse `openai-codex/gpt-5.5` catalog rows. (#85095) Thanks @sercada.
3334
- Plugins/discovery: strip `-plugin` package suffixes when deriving plugin id hints so package names line up with manifest ids. (#85170) Thanks @JulyanXu.

src/commands/agent-via-gateway.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,21 @@ function createGatewayClosedError() {
167167
});
168168
}
169169

170+
function createGatewayNormalCloseError() {
171+
const err = new Error("gateway closed (1000 normal closure): no close reason");
172+
err.name = "GatewayTransportError";
173+
return Object.assign(err, {
174+
kind: "closed",
175+
code: 1000,
176+
reason: "no close reason",
177+
connectionDetails: {
178+
url: "ws://127.0.0.1:18789",
179+
urlSource: "local loopback",
180+
message: "Gateway target: ws://127.0.0.1:18789",
181+
},
182+
});
183+
}
184+
170185
vi.mock("../config/config.js", () => ({ getRuntimeConfig: loadConfig, loadConfig }));
171186
vi.mock("../gateway/call.js", () => ({
172187
callGateway,
@@ -1021,6 +1036,36 @@ describe("agentCliCommand", () => {
10211036
expect(agentCommand).not.toHaveBeenCalled();
10221037
});
10231038
});
1039+
1040+
it("aborts while waiting for a transient gateway retry", async () => {
1041+
vi.useFakeTimers();
1042+
try {
1043+
await withTempStore(async () => {
1044+
const signals = createSignalProcess();
1045+
callGateway.mockRejectedValueOnce(createGatewayNormalCloseError());
1046+
1047+
const run = agentCliCommand({ message: "hi", to: "+1555" }, runtime, {
1048+
process: signals.processLike,
1049+
});
1050+
for (
1051+
let attempt = 0;
1052+
attempt < 10 && mockMessages(runtime.error).length === 0;
1053+
attempt += 1
1054+
) {
1055+
await Promise.resolve();
1056+
}
1057+
signals.emit("SIGTERM");
1058+
1059+
await expect(run).resolves.toBeUndefined();
1060+
expect(callGateway).toHaveBeenCalledTimes(1);
1061+
expect(agentCommand).not.toHaveBeenCalled();
1062+
expect(runtime.exit).toHaveBeenCalledWith(143);
1063+
});
1064+
} finally {
1065+
vi.useRealTimers();
1066+
}
1067+
});
1068+
10241069
it("stays silent when the gateway returns an intentional empty reply", async () => {
10251070
await withTempStore(async () => {
10261071
callGateway.mockResolvedValue({
@@ -1173,6 +1218,45 @@ describe("agentCliCommand", () => {
11731218
});
11741219
});
11751220

1221+
it("retries transient normal gateway closes before embedded fallback", async () => {
1222+
vi.useFakeTimers();
1223+
try {
1224+
await withTempStore(async () => {
1225+
callGateway
1226+
.mockRejectedValueOnce(createGatewayNormalCloseError())
1227+
.mockRejectedValueOnce(createGatewayNormalCloseError())
1228+
.mockResolvedValue({
1229+
runId: "idem-1",
1230+
status: "ok",
1231+
result: {
1232+
payloads: [{ text: "remote" }],
1233+
meta: { stub: true },
1234+
},
1235+
});
1236+
1237+
const command = agentCliCommand({ message: "hi", to: "+1555" }, runtime);
1238+
await vi.advanceTimersByTimeAsync(1_000);
1239+
await vi.advanceTimersByTimeAsync(2_000);
1240+
await command;
1241+
1242+
expect(callGateway).toHaveBeenCalledTimes(3);
1243+
const idempotencyKeys = callGateway.mock.calls.map(
1244+
([call]) => (call as { params?: { idempotencyKey?: unknown } }).params?.idempotencyKey,
1245+
);
1246+
expect(new Set(idempotencyKeys).size).toBe(1);
1247+
expect(agentCommand).not.toHaveBeenCalled();
1248+
expect(
1249+
mockMessages(runtime.error).filter((message) =>
1250+
message.includes("Gateway agent connection closed during handshake"),
1251+
),
1252+
).toHaveLength(2);
1253+
expect(runtime.log).toHaveBeenCalledWith("remote");
1254+
});
1255+
} finally {
1256+
vi.useRealTimers();
1257+
}
1258+
});
1259+
11761260
it("preserves explicit session keys for embedded fallback when the gateway closes", async () => {
11771261
await withTempStore(async () => {
11781262
callGateway.mockRejectedValue(createGatewayClosedError());

src/commands/agent-via-gateway.ts

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ const EMBEDDED_FALLBACK_META = {
5252
fallbackFrom: "gateway",
5353
} as const;
5454
const GATEWAY_TIMEOUT_FALLBACK_SESSION_PREFIX = "gateway-fallback-";
55+
const GATEWAY_TRANSIENT_CONNECT_RETRY_DELAYS_MS = [1_000, 2_000, 5_000, 10_000, 15_000] as const;
5556

5657
type AgentCliOpts = {
5758
message: string;
@@ -147,6 +148,15 @@ function isGatewayAgentEmbeddedFallbackError(err: unknown): boolean {
147148
return isGatewayTransportError(err);
148149
}
149150

151+
function isTransientGatewayAgentConnectClose(err: unknown): boolean {
152+
if (!isGatewayTransportError(err) || err.kind !== "closed") {
153+
return false;
154+
}
155+
const code = typeof err.code === "number" ? err.code : undefined;
156+
const reason = normalizeOptionalString(err.reason);
157+
return code === 1000 && (!reason || reason === "no close reason");
158+
}
159+
150160
function validateExplicitSessionKeyForDispatch(
151161
opts: Pick<AgentCliOpts, "agent" | "sessionKey">,
152162
): void {
@@ -268,8 +278,28 @@ function resolveAgentCliProcessLike(deps: AgentCliDeps | undefined): AgentCliPro
268278
return isAgentCliProcessLike(processLike) ? processLike : process;
269279
}
270280

271-
function delayMs(ms: number): Promise<void> {
272-
return new Promise((resolve) => setTimeout(resolve, ms));
281+
function createAbortDelayError(): Error {
282+
const err = new Error("gateway agent retry aborted");
283+
err.name = "AbortError";
284+
return err;
285+
}
286+
287+
function delayMs(ms: number, signal?: AbortSignal): Promise<void> {
288+
if (signal?.aborted) {
289+
return Promise.reject(createAbortDelayError());
290+
}
291+
return new Promise((resolve, reject) => {
292+
const timer = setTimeout(() => {
293+
signal?.removeEventListener("abort", onAbort);
294+
resolve();
295+
}, ms);
296+
const onAbort = () => {
297+
clearTimeout(timer);
298+
signal?.removeEventListener("abort", onAbort);
299+
reject(createAbortDelayError());
300+
};
301+
signal?.addEventListener("abort", onAbort, { once: true });
302+
});
273303
}
274304

275305
function isConfirmedChatAbortResponseForRun(value: unknown, runId: string): boolean {
@@ -631,6 +661,34 @@ async function agentViaGatewayCommand(
631661
return response;
632662
}
633663

664+
async function agentViaGatewayCommandWithTransientRetries(
665+
opts: AgentCliOpts,
666+
runtime: RuntimeEnv,
667+
signalBridge: ReturnType<typeof createAgentCliSignalBridge>,
668+
) {
669+
for (const [attempt, retryDelayMs] of [
670+
...GATEWAY_TRANSIENT_CONNECT_RETRY_DELAYS_MS,
671+
0,
672+
].entries()) {
673+
try {
674+
return await agentViaGatewayCommand(opts, runtime, signalBridge);
675+
} catch (err) {
676+
if (isAbortError(err)) {
677+
throw err;
678+
}
679+
const isFinalAttempt = attempt === GATEWAY_TRANSIENT_CONNECT_RETRY_DELAYS_MS.length;
680+
if (isFinalAttempt || !isTransientGatewayAgentConnectClose(err)) {
681+
throw err;
682+
}
683+
runtime.error?.(
684+
`Gateway agent connection closed during handshake; retrying in ${retryDelayMs}ms before embedded fallback.`,
685+
);
686+
await delayMs(retryDelayMs, signalBridge.signal);
687+
}
688+
}
689+
throw new Error("Gateway agent retry loop exhausted unexpectedly.");
690+
}
691+
634692
export async function agentCliCommand(
635693
opts: AgentCliOpts,
636694
runtime: RuntimeEnv,
@@ -639,11 +697,14 @@ export async function agentCliCommand(
639697
protectJsonStdout(opts);
640698
const dispatchOpts = normalizeSessionKeyOptsForDispatch(opts);
641699
validateExplicitSessionKeyForDispatch(dispatchOpts);
700+
const gatewayDispatchOpts = dispatchOpts.runId
701+
? dispatchOpts
702+
: { ...dispatchOpts, runId: randomIdempotencyKey() };
642703
const signalBridge = createAgentCliSignalBridge(resolveAgentCliProcessLike(deps));
643704
const localOpts = {
644-
...dispatchOpts,
645-
agentId: dispatchOpts.agent,
646-
replyAccountId: dispatchOpts.replyAccount,
705+
...gatewayDispatchOpts,
706+
agentId: gatewayDispatchOpts.agent,
707+
replyAccountId: gatewayDispatchOpts.replyAccount,
647708
cleanupBundleMcpOnRunEnd: true,
648709
cleanupCliLiveSessionOnRunEnd: true,
649710
abortSignal: signalBridge.signal,
@@ -655,7 +716,11 @@ export async function agentCliCommand(
655716
}
656717

657718
try {
658-
const result = await agentViaGatewayCommand(dispatchOpts, runtime, signalBridge);
719+
const result = await agentViaGatewayCommandWithTransientRetries(
720+
gatewayDispatchOpts,
721+
runtime,
722+
signalBridge,
723+
);
659724
return returnAfterSignalExit(result, signalBridge.getReceivedSignal(), runtime);
660725
} catch (err) {
661726
if (isAbortError(err)) {

0 commit comments

Comments
 (0)