Skip to content

Commit e91a5b0

Browse files
Grynnsteipete
authored andcommitted
fix: release stale session locks and add watchdog for hung API calls (#18060)
When a model API call hangs indefinitely (e.g. Anthropic quota exceeded mid-call), the gateway acquires a session .jsonl.lock but the promise never resolves, so the try/finally block never reaches release(). Since the owning PID is the gateway itself, stale detection cannot help — isPidAlive() always returns true. This commit adds four layers of defense: 1. **In-process lock watchdog** (session-write-lock.ts) - Track acquiredAt timestamp on each held lock - 60-second interval timer checks all held locks - Auto-releases any lock held longer than maxHoldMs (default 5 min) - Catches the hung-API-call case that try/finally cannot 2. **Gateway startup cleanup** (server-startup.ts) - On boot, scan all agent session directories for *.jsonl.lock files - Remove locks with dead PIDs or older than staleMs (30 min) - Log each cleaned lock for diagnostics 3. **openclaw doctor stale lock detection** (doctor-session-locks.ts) - New health check scans for .jsonl.lock files - Reports PID status and age of each lock found - In --fix mode, removes stale locks automatically 4. **Transcript error entry on API failure** (attempt.ts) - When promptError is set, write an error marker to the session transcript before releasing the lock - Preserves conversation history even on model API failures Closes #18060
1 parent 7d8d8c3 commit e91a5b0

8 files changed

Lines changed: 650 additions & 46 deletions

File tree

src/agents/pi-embedded-runner.e2e.test.ts

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ vi.mock("@mariozechner/pi-ai", async () => {
7373
return buildAssistantMessage(model);
7474
},
7575
streamSimple: (model: { api: string; provider: string; id: string }) => {
76+
if (model.id === "mock-throw") {
77+
throw new Error("transport failed");
78+
}
7679
const stream = new actual.AssistantMessageEventStream();
7780
queueMicrotask(() => {
7881
stream.push({
@@ -182,20 +185,21 @@ const textFromContent = (content: unknown) => {
182185
return undefined;
183186
};
184187

185-
const readSessionMessages = async (sessionFile: string) => {
188+
const readSessionEntries = async (sessionFile: string) => {
186189
const raw = await fs.readFile(sessionFile, "utf-8");
187190
return raw
188191
.split(/\r?\n/)
189192
.filter(Boolean)
190-
.map(
191-
(line) =>
192-
JSON.parse(line) as {
193-
type?: string;
194-
message?: { role?: string; content?: unknown };
195-
},
196-
)
193+
.map((line) => JSON.parse(line) as { type?: string; customType?: string; data?: unknown });
194+
};
195+
196+
const readSessionMessages = async (sessionFile: string) => {
197+
const entries = await readSessionEntries(sessionFile);
198+
return entries
197199
.filter((entry) => entry.type === "message")
198-
.map((entry) => entry.message as { role?: string; content?: unknown });
200+
.map(
201+
(entry) => (entry as { message?: { role?: string; content?: unknown } }).message,
202+
) as Array<{ role?: string; content?: unknown }>;
199203
};
200204

201205
const runDefaultEmbeddedTurn = async (sessionFile: string, prompt: string) => {
@@ -373,6 +377,35 @@ describe("runEmbeddedPiAgent", () => {
373377
expect(userIndex).toBeGreaterThanOrEqual(0);
374378
});
375379

380+
it("persists prompt transport errors as transcript entries", async () => {
381+
const sessionFile = nextSessionFile();
382+
const cfg = makeOpenAiConfig(["mock-throw"]);
383+
await ensureModels(cfg);
384+
385+
const result = await runEmbeddedPiAgent({
386+
sessionId: "session:test",
387+
sessionKey: testSessionKey,
388+
sessionFile,
389+
workspaceDir,
390+
config: cfg,
391+
prompt: "transport error",
392+
provider: "openai",
393+
model: "mock-throw",
394+
timeoutMs: 5_000,
395+
agentDir,
396+
enqueue: immediateEnqueue,
397+
});
398+
expect(result.payloads[0]?.isError).toBe(true);
399+
400+
const entries = await readSessionEntries(sessionFile);
401+
const promptErrorEntry = entries.find(
402+
(entry) => entry.type === "custom" && entry.customType === "openclaw:prompt-error",
403+
) as { data?: { error?: string } } | undefined;
404+
405+
expect(promptErrorEntry).toBeTruthy();
406+
expect(promptErrorEntry?.data?.error).toContain("transport failed");
407+
});
408+
376409
it(
377410
"appends new user + assistant after existing transcript entries",
378411
{ timeout: 90_000 },

src/agents/pi-embedded-runner/run/attempt.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -848,6 +848,7 @@ export async function runEmbeddedAttempt(
848848
}).sessionAgentId;
849849

850850
let promptError: unknown = null;
851+
let promptErrorSource: "prompt" | "compaction" | null = null;
851852
try {
852853
const promptStartedAt = Date.now();
853854

@@ -1000,6 +1001,7 @@ export async function runEmbeddedAttempt(
10001001
}
10011002
} catch (err) {
10021003
promptError = err;
1004+
promptErrorSource = "prompt";
10031005
} finally {
10041006
log.debug(
10051007
`embedded run prompt end: runId=${params.runId} sessionId=${params.sessionId} durationMs=${Date.now() - promptStartedAt}`,
@@ -1022,6 +1024,7 @@ export async function runEmbeddedAttempt(
10221024
if (isRunnerAbortError(err)) {
10231025
if (!promptError) {
10241026
promptError = err;
1027+
promptErrorSource = "compaction";
10251028
}
10261029
if (!isProbeSession) {
10271030
log.debug(
@@ -1070,6 +1073,23 @@ export async function runEmbeddedAttempt(
10701073
}
10711074
messagesSnapshot = snapshotSelection.messagesSnapshot;
10721075
sessionIdUsed = snapshotSelection.sessionIdUsed;
1076+
1077+
if (promptError && promptErrorSource === "prompt") {
1078+
try {
1079+
sessionManager.appendCustomEntry("openclaw:prompt-error", {
1080+
timestamp: Date.now(),
1081+
runId: params.runId,
1082+
sessionId: params.sessionId,
1083+
provider: params.provider,
1084+
model: params.modelId,
1085+
api: params.model.api,
1086+
error: describeUnknownError(promptError),
1087+
});
1088+
} catch (entryErr) {
1089+
log.warn(`failed to persist prompt error entry: ${String(entryErr)}`);
1090+
}
1091+
}
1092+
10731093
cacheTrace?.recordStage("session:after", {
10741094
messages: messagesSnapshot,
10751095
note: timedOutDuringCompaction

src/agents/session-write-lock.e2e.test.ts

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import fs from "node:fs/promises";
22
import os from "node:os";
33
import path from "node:path";
4-
import { describe, expect, it } from "vitest";
5-
import { __testing, acquireSessionWriteLock } from "./session-write-lock.js";
4+
import { describe, expect, it, vi } from "vitest";
5+
import { __testing, acquireSessionWriteLock, cleanStaleLockFiles } from "./session-write-lock.js";
66

77
describe("acquireSessionWriteLock", () => {
88
it("reuses locks across symlinked session paths", async () => {
@@ -72,6 +72,95 @@ describe("acquireSessionWriteLock", () => {
7272
}
7373
});
7474

75+
it("watchdog releases stale in-process locks", async () => {
76+
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
77+
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
78+
try {
79+
const sessionFile = path.join(root, "session.jsonl");
80+
const lockPath = `${sessionFile}.lock`;
81+
const lockA = await acquireSessionWriteLock({
82+
sessionFile,
83+
timeoutMs: 500,
84+
maxHoldMs: 1,
85+
});
86+
87+
const released = await __testing.runLockWatchdogCheck(Date.now() + 1000);
88+
expect(released).toBeGreaterThanOrEqual(1);
89+
await expect(fs.access(lockPath)).rejects.toThrow();
90+
91+
const lockB = await acquireSessionWriteLock({ sessionFile, timeoutMs: 500 });
92+
await expect(fs.access(lockPath)).resolves.toBeUndefined();
93+
94+
// Old release handle must not affect the new lock.
95+
await lockA.release();
96+
await expect(fs.access(lockPath)).resolves.toBeUndefined();
97+
98+
await lockB.release();
99+
await expect(fs.access(lockPath)).rejects.toThrow();
100+
} finally {
101+
warnSpy.mockRestore();
102+
await fs.rm(root, { recursive: true, force: true });
103+
}
104+
});
105+
106+
it("cleans stale .jsonl lock files in sessions directories", async () => {
107+
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
108+
const sessionsDir = path.join(root, "sessions");
109+
await fs.mkdir(sessionsDir, { recursive: true });
110+
111+
const nowMs = Date.now();
112+
const staleDeadLock = path.join(sessionsDir, "dead.jsonl.lock");
113+
const staleAliveLock = path.join(sessionsDir, "old-live.jsonl.lock");
114+
const freshAliveLock = path.join(sessionsDir, "fresh-live.jsonl.lock");
115+
116+
try {
117+
await fs.writeFile(
118+
staleDeadLock,
119+
JSON.stringify({
120+
pid: 999_999,
121+
createdAt: new Date(nowMs - 120_000).toISOString(),
122+
}),
123+
"utf8",
124+
);
125+
await fs.writeFile(
126+
staleAliveLock,
127+
JSON.stringify({
128+
pid: process.pid,
129+
createdAt: new Date(nowMs - 120_000).toISOString(),
130+
}),
131+
"utf8",
132+
);
133+
await fs.writeFile(
134+
freshAliveLock,
135+
JSON.stringify({
136+
pid: process.pid,
137+
createdAt: new Date(nowMs - 1_000).toISOString(),
138+
}),
139+
"utf8",
140+
);
141+
142+
const result = await cleanStaleLockFiles({
143+
sessionsDir,
144+
staleMs: 30_000,
145+
nowMs,
146+
removeStale: true,
147+
});
148+
149+
expect(result.locks).toHaveLength(3);
150+
expect(result.cleaned).toHaveLength(2);
151+
expect(result.cleaned.map((entry) => path.basename(entry.lockPath)).toSorted()).toEqual([
152+
"dead.jsonl.lock",
153+
"old-live.jsonl.lock",
154+
]);
155+
156+
await expect(fs.access(staleDeadLock)).rejects.toThrow();
157+
await expect(fs.access(staleAliveLock)).rejects.toThrow();
158+
await expect(fs.access(freshAliveLock)).resolves.toBeUndefined();
159+
} finally {
160+
await fs.rm(root, { recursive: true, force: true });
161+
}
162+
});
163+
75164
it("removes held locks on termination signals", async () => {
76165
const signals = ["SIGINT", "SIGTERM", "SIGQUIT", "SIGABRT"] as const;
77166
for (const signal of signals) {

0 commit comments

Comments
 (0)