Skip to content

Commit 61f7cea

Browse files
dutifulbobosolmaz
andauthored
fix: kill stuck ACP child processes on startup and harden sessions in discord threads (#33699)
* Gateway: resolve agent.wait for chat.send runs * Discord: harden ACP thread binding + listener timeout * ACPX: handle already-exited child wait * Gateway/Discord: address PR review findings * Discord: keep ACP error-state thread bindings on startup * gateway: make agent.wait dedupe bridge event-driven * discord: harden ACP probe classification and cap startup fan-out * discord: add cooperative timeout cancellation * discord: fix startup probe concurrency helper typing * plugin-sdk: avoid Windows root-alias shard timeout * plugin-sdk: keep root alias reflection path non-blocking * discord+gateway: resolve remaining PR review findings * gateway+discord: fix codex review regressions * Discord/Gateway: address Codex review findings * Gateway: keep agent.wait lifecycle active with shared run IDs * Discord: clean up status reactions on aborted runs * fix: add changelog note for ACP/Discord startup hardening (#33699) (thanks @dutifulbob) --------- Co-authored-by: Onur <2453968+osolmaz@users.noreply.github.com>
1 parent bd25182 commit 61f7cea

30 files changed

Lines changed: 2566 additions & 178 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
2020
- Outbound/send config threading: pass resolved SecretRef config through outbound adapters and helper send paths so send flows do not reload unresolved runtime config. (#33987) Thanks @joshavant.
2121
- Sessions/subagent attachments: remove `attachments[].content.maxLength` from `sessions_spawn` schema to avoid llama.cpp GBNF repetition overflow, and preflight UTF-8 byte size before buffer allocation while keeping runtime file-size enforcement unchanged. (#33648) Thanks @anisoptera.
2222
- Runtime/tool-state stability: recover from dangling Anthropic `tool_use` after compaction, serialize long-running Discord handler runs without blocking new inbound events, and prevent stale busy snapshots from suppressing stuck-channel recovery. (from #33630, #33583) Thanks @kevinWangSheng and @theotarr.
23+
- ACP/Discord startup hardening: clean up stuck ACP worker children on gateway restart, unbind stale ACP thread bindings during Discord startup reconciliation, and add per-thread listener watchdog timeouts so wedged turns cannot block later messages. (#33699) Thanks @dutifulbob.
2324
- Extensions/media local-root propagation: consistently forward `mediaLocalRoots` through extension `sendMedia` adapters (Google Chat, Slack, iMessage, Signal, WhatsApp), preserving non-local media behavior while restoring local attachment resolution from configured roots. Synthesis of #33581, #33545, #33540, #33536, #33528. Thanks @bmendonca3.
2425
- Gateway/security default response headers: add `Permissions-Policy: camera=(), microphone=(), geolocation=()` to baseline gateway HTTP security headers for all responses. (#30186) thanks @habakan.
2526
- Plugins/startup loading: lazily initialize plugin runtime, split startup-critical plugin SDK imports into `openclaw/plugin-sdk/core` and `openclaw/plugin-sdk/telegram`, and preserve `api.runtime` reflection semantics for plugin compatibility. (#28620) thanks @hmemcpy.

extensions/acpx/src/runtime-internals/process.test.ts

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
1+
import { spawn } from "node:child_process";
12
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
23
import { tmpdir } from "node:os";
34
import path from "node:path";
45
import { afterEach, describe, expect, it } from "vitest";
56
import { createWindowsCmdShimFixture } from "../../../shared/windows-cmd-shim-test-fixtures.js";
6-
import { resolveSpawnCommand, type SpawnCommandCache } from "./process.js";
7+
import {
8+
resolveSpawnCommand,
9+
spawnAndCollect,
10+
type SpawnCommandCache,
11+
waitForExit,
12+
} from "./process.js";
713

814
const tempDirs: string[] = [];
915

@@ -225,3 +231,62 @@ describe("resolveSpawnCommand", () => {
225231
expect(second.args[0]).toBe(scriptPath);
226232
});
227233
});
234+
235+
describe("waitForExit", () => {
236+
it("resolves when the child already exited before waiting starts", async () => {
237+
const child = spawn(process.execPath, ["-e", "process.exit(0)"], {
238+
stdio: ["pipe", "pipe", "pipe"],
239+
});
240+
241+
await new Promise<void>((resolve, reject) => {
242+
child.once("close", () => {
243+
resolve();
244+
});
245+
child.once("error", reject);
246+
});
247+
248+
const exit = await waitForExit(child);
249+
expect(exit.code).toBe(0);
250+
expect(exit.signal).toBeNull();
251+
expect(exit.error).toBeNull();
252+
});
253+
});
254+
255+
describe("spawnAndCollect", () => {
256+
it("returns abort error immediately when signal is already aborted", async () => {
257+
const controller = new AbortController();
258+
controller.abort();
259+
const result = await spawnAndCollect(
260+
{
261+
command: process.execPath,
262+
args: ["-e", "process.exit(0)"],
263+
cwd: process.cwd(),
264+
},
265+
undefined,
266+
{ signal: controller.signal },
267+
);
268+
269+
expect(result.code).toBeNull();
270+
expect(result.error?.name).toBe("AbortError");
271+
});
272+
273+
it("terminates a running process when signal aborts", async () => {
274+
const controller = new AbortController();
275+
const resultPromise = spawnAndCollect(
276+
{
277+
command: process.execPath,
278+
args: ["-e", "setTimeout(() => process.stdout.write('done'), 10_000)"],
279+
cwd: process.cwd(),
280+
},
281+
undefined,
282+
{ signal: controller.signal },
283+
);
284+
285+
setTimeout(() => {
286+
controller.abort();
287+
}, 10);
288+
289+
const result = await resultPromise;
290+
expect(result.error?.name).toBe("AbortError");
291+
});
292+
});

extensions/acpx/src/runtime-internals/process.ts

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@ export function resolveSpawnCommand(
114114
};
115115
}
116116

117+
function createAbortError(): Error {
118+
const error = new Error("Operation aborted.");
119+
error.name = "AbortError";
120+
return error;
121+
}
122+
117123
export function spawnWithResolvedCommand(
118124
params: {
119125
command: string;
@@ -140,6 +146,15 @@ export function spawnWithResolvedCommand(
140146
}
141147

142148
export async function waitForExit(child: ChildProcessWithoutNullStreams): Promise<SpawnExit> {
149+
// Handle callers that start waiting after the child has already exited.
150+
if (child.exitCode !== null || child.signalCode !== null) {
151+
return {
152+
code: child.exitCode,
153+
signal: child.signalCode,
154+
error: null,
155+
};
156+
}
157+
143158
return await new Promise<SpawnExit>((resolve) => {
144159
let settled = false;
145160
const finish = (result: SpawnExit) => {
@@ -167,12 +182,23 @@ export async function spawnAndCollect(
167182
cwd: string;
168183
},
169184
options?: SpawnCommandOptions,
185+
runtime?: {
186+
signal?: AbortSignal;
187+
},
170188
): Promise<{
171189
stdout: string;
172190
stderr: string;
173191
code: number | null;
174192
error: Error | null;
175193
}> {
194+
if (runtime?.signal?.aborted) {
195+
return {
196+
stdout: "",
197+
stderr: "",
198+
code: null,
199+
error: createAbortError(),
200+
};
201+
}
176202
const child = spawnWithResolvedCommand(params, options);
177203
child.stdin.end();
178204

@@ -185,13 +211,43 @@ export async function spawnAndCollect(
185211
stderr += String(chunk);
186212
});
187213

188-
const exit = await waitForExit(child);
189-
return {
190-
stdout,
191-
stderr,
192-
code: exit.code,
193-
error: exit.error,
214+
let abortKillTimer: NodeJS.Timeout | undefined;
215+
let aborted = false;
216+
const onAbort = () => {
217+
aborted = true;
218+
try {
219+
child.kill("SIGTERM");
220+
} catch {
221+
// Ignore kill races when child already exited.
222+
}
223+
abortKillTimer = setTimeout(() => {
224+
if (child.exitCode !== null || child.signalCode !== null) {
225+
return;
226+
}
227+
try {
228+
child.kill("SIGKILL");
229+
} catch {
230+
// Ignore kill races when child already exited.
231+
}
232+
}, 250);
233+
abortKillTimer.unref?.();
194234
};
235+
runtime?.signal?.addEventListener("abort", onAbort, { once: true });
236+
237+
try {
238+
const exit = await waitForExit(child);
239+
return {
240+
stdout,
241+
stderr,
242+
code: exit.code,
243+
error: aborted ? createAbortError() : exit.error,
244+
};
245+
} finally {
246+
runtime?.signal?.removeEventListener("abort", onAbort);
247+
if (abortKillTimer) {
248+
clearTimeout(abortKillTimer);
249+
}
250+
}
195251
}
196252

197253
export function resolveSpawnFailure(

extensions/acpx/src/runtime.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,10 @@ export class AcpxRuntime implements AcpRuntime {
353353
return ACPX_CAPABILITIES;
354354
}
355355

356-
async getStatus(input: { handle: AcpRuntimeHandle }): Promise<AcpRuntimeStatus> {
356+
async getStatus(input: {
357+
handle: AcpRuntimeHandle;
358+
signal?: AbortSignal;
359+
}): Promise<AcpRuntimeStatus> {
357360
const state = this.resolveHandleState(input.handle);
358361
const events = await this.runControlCommand({
359362
args: this.buildControlArgs({
@@ -363,6 +366,7 @@ export class AcpxRuntime implements AcpRuntime {
363366
cwd: state.cwd,
364367
fallbackCode: "ACP_TURN_FAILED",
365368
ignoreNoSession: true,
369+
signal: input.signal,
366370
});
367371
const detail = events.find((event) => !toAcpxErrorEvent(event)) ?? events[0];
368372
if (!detail) {
@@ -586,6 +590,7 @@ export class AcpxRuntime implements AcpRuntime {
586590
cwd: string;
587591
fallbackCode: AcpRuntimeErrorCode;
588592
ignoreNoSession?: boolean;
593+
signal?: AbortSignal;
589594
}): Promise<AcpxJsonObject[]> {
590595
const result = await spawnAndCollect(
591596
{
@@ -594,6 +599,9 @@ export class AcpxRuntime implements AcpRuntime {
594599
cwd: params.cwd,
595600
},
596601
this.spawnCommandOptions,
602+
{
603+
signal: params.signal,
604+
},
597605
);
598606

599607
if (result.error) {

src/acp/control-plane/manager.core.ts

Lines changed: 88 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -316,70 +316,85 @@ export class AcpSessionManager {
316316
async getSessionStatus(params: {
317317
cfg: OpenClawConfig;
318318
sessionKey: string;
319+
signal?: AbortSignal;
319320
}): Promise<AcpSessionStatus> {
320321
const sessionKey = normalizeSessionKey(params.sessionKey);
321322
if (!sessionKey) {
322323
throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP session key is required.");
323324
}
325+
this.throwIfAborted(params.signal);
324326
await this.evictIdleRuntimeHandles({ cfg: params.cfg });
325-
return await this.withSessionActor(sessionKey, async () => {
326-
const resolution = this.resolveSession({
327-
cfg: params.cfg,
328-
sessionKey,
329-
});
330-
if (resolution.kind === "none") {
331-
throw new AcpRuntimeError(
332-
"ACP_SESSION_INIT_FAILED",
333-
`Session is not ACP-enabled: ${sessionKey}`,
334-
);
335-
}
336-
if (resolution.kind === "stale") {
337-
throw resolution.error;
338-
}
339-
const {
340-
runtime,
341-
handle: ensuredHandle,
342-
meta: ensuredMeta,
343-
} = await this.ensureRuntimeHandle({
344-
cfg: params.cfg,
345-
sessionKey,
346-
meta: resolution.meta,
347-
});
348-
let handle = ensuredHandle;
349-
let meta = ensuredMeta;
350-
const capabilities = await this.resolveRuntimeCapabilities({ runtime, handle });
351-
let runtimeStatus: AcpRuntimeStatus | undefined;
352-
if (runtime.getStatus) {
353-
runtimeStatus = await withAcpRuntimeErrorBoundary({
354-
run: async () => await runtime.getStatus!({ handle }),
355-
fallbackCode: "ACP_TURN_FAILED",
356-
fallbackMessage: "Could not read ACP runtime status.",
327+
return await this.withSessionActor(
328+
sessionKey,
329+
async () => {
330+
this.throwIfAborted(params.signal);
331+
const resolution = this.resolveSession({
332+
cfg: params.cfg,
333+
sessionKey,
357334
});
358-
}
359-
({ handle, meta, runtimeStatus } = await this.reconcileRuntimeSessionIdentifiers({
360-
cfg: params.cfg,
361-
sessionKey,
362-
runtime,
363-
handle,
364-
meta,
365-
runtimeStatus,
366-
failOnStatusError: true,
367-
}));
368-
const identity = resolveSessionIdentityFromMeta(meta);
369-
return {
370-
sessionKey,
371-
backend: handle.backend || meta.backend,
372-
agent: meta.agent,
373-
...(identity ? { identity } : {}),
374-
state: meta.state,
375-
mode: meta.mode,
376-
runtimeOptions: resolveRuntimeOptionsFromMeta(meta),
377-
capabilities,
378-
runtimeStatus,
379-
lastActivityAt: meta.lastActivityAt,
380-
lastError: meta.lastError,
381-
};
382-
});
335+
if (resolution.kind === "none") {
336+
throw new AcpRuntimeError(
337+
"ACP_SESSION_INIT_FAILED",
338+
`Session is not ACP-enabled: ${sessionKey}`,
339+
);
340+
}
341+
if (resolution.kind === "stale") {
342+
throw resolution.error;
343+
}
344+
const {
345+
runtime,
346+
handle: ensuredHandle,
347+
meta: ensuredMeta,
348+
} = await this.ensureRuntimeHandle({
349+
cfg: params.cfg,
350+
sessionKey,
351+
meta: resolution.meta,
352+
});
353+
let handle = ensuredHandle;
354+
let meta = ensuredMeta;
355+
const capabilities = await this.resolveRuntimeCapabilities({ runtime, handle });
356+
let runtimeStatus: AcpRuntimeStatus | undefined;
357+
if (runtime.getStatus) {
358+
runtimeStatus = await withAcpRuntimeErrorBoundary({
359+
run: async () => {
360+
this.throwIfAborted(params.signal);
361+
const status = await runtime.getStatus!({
362+
handle,
363+
...(params.signal ? { signal: params.signal } : {}),
364+
});
365+
this.throwIfAborted(params.signal);
366+
return status;
367+
},
368+
fallbackCode: "ACP_TURN_FAILED",
369+
fallbackMessage: "Could not read ACP runtime status.",
370+
});
371+
}
372+
({ handle, meta, runtimeStatus } = await this.reconcileRuntimeSessionIdentifiers({
373+
cfg: params.cfg,
374+
sessionKey,
375+
runtime,
376+
handle,
377+
meta,
378+
runtimeStatus,
379+
failOnStatusError: true,
380+
}));
381+
const identity = resolveSessionIdentityFromMeta(meta);
382+
return {
383+
sessionKey,
384+
backend: handle.backend || meta.backend,
385+
agent: meta.agent,
386+
...(identity ? { identity } : {}),
387+
state: meta.state,
388+
mode: meta.mode,
389+
runtimeOptions: resolveRuntimeOptionsFromMeta(meta),
390+
capabilities,
391+
runtimeStatus,
392+
lastActivityAt: meta.lastActivityAt,
393+
lastError: meta.lastError,
394+
};
395+
},
396+
params.signal,
397+
);
383398
}
384399

385400
async setSessionRuntimeMode(params: {
@@ -1295,9 +1310,23 @@ export class AcpSessionManager {
12951310
}
12961311
}
12971312

1298-
private async withSessionActor<T>(sessionKey: string, op: () => Promise<T>): Promise<T> {
1313+
private async withSessionActor<T>(
1314+
sessionKey: string,
1315+
op: () => Promise<T>,
1316+
signal?: AbortSignal,
1317+
): Promise<T> {
12991318
const actorKey = normalizeActorKey(sessionKey);
1300-
return await this.actorQueue.run(actorKey, op);
1319+
return await this.actorQueue.run(actorKey, async () => {
1320+
this.throwIfAborted(signal);
1321+
return await op();
1322+
});
1323+
}
1324+
1325+
private throwIfAborted(signal?: AbortSignal): void {
1326+
if (!signal?.aborted) {
1327+
return;
1328+
}
1329+
throw new AcpRuntimeError("ACP_TURN_FAILED", "ACP operation aborted.");
13011330
}
13021331

13031332
private getCachedRuntimeState(sessionKey: string): CachedRuntimeState | null {

0 commit comments

Comments
 (0)