Skip to content

Commit 0e4b0de

Browse files
committed
fix: recover stale reply turns with valid send action
1 parent 4ef0d45 commit 0e4b0de

2 files changed

Lines changed: 249 additions & 24 deletions

File tree

src/logging/diagnostic-stuck-session-recovery.runtime.test.ts

Lines changed: 106 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ const mocks = vi.hoisted(() => ({
99
isEmbeddedPiRunActive: vi.fn(),
1010
isEmbeddedPiRunHandleActive: vi.fn(),
1111
getCommandLaneSnapshot: vi.fn(),
12+
enqueueCommandInLane: vi.fn(async (_lane: string, task: () => Promise<unknown>) => task()),
1213
resetCommandLane: vi.fn(),
14+
callGateway: vi.fn(),
1315
resolveActiveEmbeddedRunSessionId: vi.fn(),
1416
resolveActiveEmbeddedRunHandleSessionId: vi.fn(),
1517
resolveEmbeddedSessionLane: vi.fn((key: string) => `session:${key}`),
@@ -52,10 +54,15 @@ vi.mock("../agents/pi-embedded-runner/lanes.js", () => ({
5254
}));
5355

5456
vi.mock("../process/command-queue.js", () => ({
57+
enqueueCommandInLane: mocks.enqueueCommandInLane,
5558
getCommandLaneSnapshot: mocks.getCommandLaneSnapshot,
5659
resetCommandLane: mocks.resetCommandLane,
5760
}));
5861

62+
vi.mock("../gateway/call.js", () => ({
63+
callGateway: mocks.callGateway,
64+
}));
65+
5966
vi.mock("./diagnostic-runtime.js", () => ({
6067
diagnosticLogger: mocks.diag,
6168
}));
@@ -72,6 +79,12 @@ function resetMocks() {
7279
mocks.isEmbeddedPiRunActive.mockReset();
7380
mocks.isEmbeddedPiRunHandleActive.mockReset();
7481
mocks.getCommandLaneSnapshot.mockReset();
82+
mocks.enqueueCommandInLane.mockReset();
83+
mocks.enqueueCommandInLane.mockImplementation(
84+
async (_lane: string, task: () => Promise<unknown>) => task(),
85+
);
86+
mocks.callGateway.mockReset();
87+
mocks.callGateway.mockResolvedValue({ ok: true });
7588
mocks.getCommandLaneSnapshot.mockReturnValue({
7689
lane: "session:agent:main:main",
7790
queuedCount: 1,
@@ -89,13 +102,6 @@ function resetMocks() {
89102
mocks.diag.warn.mockReset();
90103
}
91104

92-
function warnLogMessages(): string[] {
93-
return mocks.diag.warn.mock.calls.map(([message]) => {
94-
expect(typeof message).toBe("string");
95-
return message as string;
96-
});
97-
}
98-
99105
describe("stuck session recovery", () => {
100106
beforeEach(() => {
101107
resetMocks();
@@ -115,10 +121,10 @@ describe("stuck session recovery", () => {
115121
expect(mocks.waitForEmbeddedPiRunEnd).not.toHaveBeenCalled();
116122
expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled();
117123
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
118-
expect(warnLogMessages()).toEqual([
119-
"stuck session recovery skipped: sessionId=session-1 sessionKey=agent:main:main age=180s queueDepth=1 activeSessionId=session-1",
120-
"stuck session recovery outcome: status=skipped action=observe_only sessionId=session-1 sessionKey=agent:main:main activeSessionId=session-1 activeWorkKind=embedded_run reason=active_embedded_run",
121-
]);
124+
expect(mocks.diag.warn).toHaveBeenCalledWith(
125+
expect.stringContaining("reason=active_embedded_run"),
126+
);
127+
expect(mocks.diag.warn).toHaveBeenCalledWith(expect.stringContaining("action=observe_only"));
122128
});
123129

124130
it("aborts an active embedded run when active abort recovery is enabled", async () => {
@@ -179,10 +185,15 @@ describe("stuck session recovery", () => {
179185
fs.rmSync(tempDir, { recursive: true, force: true });
180186
}
181187

182-
expect(warnLogMessages()).toEqual([
183-
'stuck session recovery: sessionId=run-456 sessionKey=agent:clawblocker:cron:job-123:run:run-456 age=629s action=abort_embedded_run aborted=true drained=true released=0 stopped="Twitter Mention Moderation Agent" cronJobId=job-123 cronRunId=run-456 lastAssistant="There are 40 cached mentions."',
184-
"stuck session recovery outcome: status=aborted action=abort_embedded_run sessionId=run-456 sessionKey=agent:clawblocker:cron:job-123:run:run-456 activeSessionId=run-456 activeWorkKind=embedded_run lane=session:agent:clawblocker:cron:job-123:run:run-456 aborted=true drained=true forceCleared=false released=0",
185-
]);
188+
expect(mocks.diag.warn).toHaveBeenCalledWith(
189+
expect.stringContaining("action=abort_embedded_run"),
190+
);
191+
expect(mocks.diag.warn).toHaveBeenCalledWith(
192+
expect.stringContaining('stopped="Twitter Mention Moderation Agent"'),
193+
);
194+
expect(mocks.diag.warn).toHaveBeenCalledWith(
195+
expect.stringContaining('lastAssistant="There are 40 cached mentions."'),
196+
);
186197
});
187198

188199
it("force-clears and releases the session lane when abort cleanup does not drain", async () => {
@@ -257,9 +268,12 @@ describe("stuck session recovery", () => {
257268
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
258269
expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled();
259270
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
260-
expect(warnLogMessages()).toEqual([
261-
"stuck session recovery outcome: status=skipped action=keep_lane sessionId=queued-reply-session sessionKey=agent:main:main activeSessionId=queued-reply-session activeWorkKind=embedded_run reason=active_reply_work",
262-
]);
271+
expect(mocks.diag.warn).toHaveBeenCalledWith(
272+
expect.stringContaining("reason=active_reply_work"),
273+
);
274+
expect(mocks.diag.warn).toHaveBeenCalledWith(
275+
expect.stringContaining("activeSessionId=queued-reply-session"),
276+
);
263277
});
264278

265279
it("does not release the session lane while unregistered lane work is active", async () => {
@@ -286,9 +300,10 @@ describe("stuck session recovery", () => {
286300
expect(mocks.abortEmbeddedPiRun).not.toHaveBeenCalled();
287301
expect(mocks.forceClearEmbeddedPiRun).not.toHaveBeenCalled();
288302
expect(mocks.resetCommandLane).not.toHaveBeenCalled();
289-
expect(warnLogMessages()).toEqual([
290-
"stuck session recovery outcome: status=skipped action=keep_lane sessionId=unregistered-work-session sessionKey=agent:main:main lane=session:agent:main:main reason=active_lane_task laneActive=1 laneQueued=1",
291-
]);
303+
expect(mocks.diag.warn).toHaveBeenCalledWith(
304+
expect.stringContaining("reason=active_lane_task"),
305+
);
306+
expect(mocks.diag.warn).toHaveBeenCalledWith(expect.stringContaining("laneActive=1"));
292307
});
293308

294309
it("reports when recovery finds no active work to release", async () => {
@@ -304,9 +319,76 @@ describe("stuck session recovery", () => {
304319
});
305320

306321
expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main");
307-
expect(warnLogMessages()).toEqual([
308-
"stuck session recovery outcome: status=noop action=none sessionId=stale-session sessionKey=agent:main:main lane=session:agent:main:main reason=no_active_work",
309-
]);
322+
expect(mocks.diag.warn).toHaveBeenCalledWith(expect.stringContaining("reason=no_active_work"));
323+
});
324+
325+
it("closes a stale unfinished topic turn even when task runs are empty", async () => {
326+
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
327+
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-stale-reply-turn-"));
328+
const sessionKey = "agent:openclaw:telegram:group:-100:topic:2";
329+
try {
330+
process.env.OPENCLAW_STATE_DIR = tempDir;
331+
const { resolveStateDir } = await import("../config/paths.js");
332+
const stateDir = resolveStateDir();
333+
const storeDir = path.join(tempDir, "agents", "openclaw", "sessions");
334+
const resolvedStoreDir = path.join(stateDir, "agents", "openclaw", "sessions");
335+
fs.mkdirSync(storeDir, { recursive: true });
336+
fs.mkdirSync(resolvedStoreDir, { recursive: true });
337+
const storeJson = JSON.stringify({
338+
[sessionKey]: {
339+
sessionId: "session-1",
340+
replyTurnState: "running",
341+
replyTurnStartedAt: Date.now() - 180_000,
342+
replyTurnUpdatedAt: Date.now() - 180_000,
343+
deliveryContext: {
344+
channel: "telegram",
345+
to: "-100",
346+
threadId: "2",
347+
},
348+
},
349+
});
350+
fs.writeFileSync(path.join(storeDir, "sessions.json"), storeJson);
351+
fs.writeFileSync(path.join(resolvedStoreDir, "sessions.json"), storeJson);
352+
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
353+
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue(undefined);
354+
mocks.isEmbeddedPiRunActive.mockReturnValue(false);
355+
mocks.resetCommandLane.mockReturnValue(0);
356+
357+
const outcome = await recoverStuckDiagnosticSession({
358+
sessionId: "session-1",
359+
sessionKey,
360+
ageMs: 180_000,
361+
});
362+
363+
expect(outcome.status).toBe("released");
364+
expect("reason" in outcome ? outcome.reason : undefined).toBe("stale_reply_turn_closed");
365+
expect(mocks.enqueueCommandInLane).toHaveBeenCalledWith("message", expect.any(Function));
366+
expect(mocks.callGateway).toHaveBeenCalledWith(
367+
expect.objectContaining({
368+
method: "message.action",
369+
params: expect.objectContaining({
370+
action: "send",
371+
channel: "telegram",
372+
idempotencyKey: `stale-reply-turn-recovery:${sessionKey}`,
373+
params: expect.objectContaining({
374+
to: "-100",
375+
threadId: "2",
376+
message: expect.stringContaining("interrupted"),
377+
}),
378+
}),
379+
}),
380+
);
381+
const store = JSON.parse(fs.readFileSync(path.join(storeDir, "sessions.json"), "utf8"));
382+
expect(store[sessionKey].replyTurnState).toBe("failed");
383+
expect(store[sessionKey].replyTurnLastError).toBe("recovered_after_restart");
384+
} finally {
385+
if (previousStateDir === undefined) {
386+
delete process.env.OPENCLAW_STATE_DIR;
387+
} else {
388+
process.env.OPENCLAW_STATE_DIR = previousStateDir;
389+
}
390+
fs.rmSync(tempDir, { recursive: true, force: true });
391+
}
310392
});
311393

312394
it("releases a stale session-id lane when no session key is available", async () => {

src/logging/diagnostic-stuck-session-recovery.runtime.ts

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import fs from "node:fs";
2+
import path from "node:path";
13
import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js";
24
import {
35
abortAndDrainEmbeddedPiRun,
@@ -6,7 +8,16 @@ import {
68
resolveActiveEmbeddedRunSessionId,
79
resolveActiveEmbeddedRunHandleSessionId,
810
} from "../agents/pi-embedded-runner/runs.js";
11+
import { resolveStateDir } from "../config/paths.js";
12+
import {
13+
type SessionEntry,
14+
loadSessionStore,
15+
updateSessionStore,
16+
resolveDefaultSessionStorePath,
17+
} from "../config/sessions.js";
18+
import { callGateway } from "../gateway/call.js";
919
import { getCommandLaneSnapshot, resetCommandLane } from "../process/command-queue.js";
20+
import { enqueueCommandInLane } from "../process/command-queue.js";
1021
import { diagnosticLogger as diag } from "./diagnostic-runtime.js";
1122
import {
1223
formatStoppedCronSessionDiagnosticFields,
@@ -20,6 +31,7 @@ import {
2031
import { isDiagnosticSessionStateCurrent } from "./diagnostic-session-state.js";
2132

2233
const STUCK_SESSION_ABORT_SETTLE_MS = 15_000;
34+
const STALE_REPLY_TURN_MS = 2 * 60 * 1000;
2335
const recoveriesInFlight = new Set<string>();
2436

2537
export type StuckSessionRecoveryParams = StuckSessionRecoveryRequest;
@@ -53,6 +65,119 @@ function formatRecoveryContext(
5365
return fields.join(" ");
5466
}
5567

68+
function resolveSessionStorePathsForRecovery(sessionKey?: string): string[] {
69+
const stateDir = resolveStateDir();
70+
const paths = new Set<string>();
71+
const agentId = sessionKey?.startsWith("agent:") ? sessionKey.split(":")[1]?.trim() : undefined;
72+
if (agentId) {
73+
paths.add(resolveDefaultSessionStorePath(agentId));
74+
}
75+
const agentsDir = path.join(stateDir, "agents");
76+
try {
77+
for (const entry of fs.readdirSync(agentsDir, { withFileTypes: true })) {
78+
if (entry.isDirectory()) {
79+
paths.add(path.join(agentsDir, entry.name, "sessions.json"));
80+
}
81+
}
82+
} catch {
83+
// Best-effort only; explicit path recovery is still handled below.
84+
}
85+
return [...paths];
86+
}
87+
88+
function findReplyTurnSessionEntry(params: {
89+
sessionKey?: string;
90+
}): { storePath: string; entry: SessionEntry } | undefined {
91+
const sessionKey = params.sessionKey?.trim();
92+
if (!sessionKey) {
93+
return undefined;
94+
}
95+
for (const storePath of resolveSessionStorePathsForRecovery(sessionKey)) {
96+
try {
97+
const store = loadSessionStore(storePath);
98+
const entry = store[sessionKey];
99+
if (entry) {
100+
return { storePath, entry };
101+
}
102+
} catch {
103+
// Continue scanning other agent stores.
104+
}
105+
}
106+
return undefined;
107+
}
108+
109+
function shouldCloseStaleReplyTurn(entry: SessionEntry, now = Date.now()): boolean {
110+
if (entry.replyTurnState !== "running") {
111+
return false;
112+
}
113+
const updatedAt = entry.replyTurnUpdatedAt ?? entry.replyTurnStartedAt ?? entry.updatedAt;
114+
return typeof updatedAt === "number" && now - updatedAt >= STALE_REPLY_TURN_MS;
115+
}
116+
117+
async function sendRecoveryFallback(params: {
118+
sessionKey: string;
119+
entry: SessionEntry;
120+
}): Promise<boolean> {
121+
const deliveryContext = params.entry.deliveryContext;
122+
const origin = params.entry.origin;
123+
const channel =
124+
deliveryContext?.channel ?? params.entry.channel ?? origin?.surface ?? origin?.provider;
125+
const to = deliveryContext?.to ?? params.entry.lastTo ?? origin?.to;
126+
if (!channel || !to) {
127+
return false;
128+
}
129+
try {
130+
await enqueueCommandInLane("message", () =>
131+
callGateway({
132+
method: "message.action",
133+
params: {
134+
action: "send",
135+
channel,
136+
accountId: deliveryContext?.accountId ?? params.entry.lastAccountId ?? origin?.accountId,
137+
idempotencyKey: `stale-reply-turn-recovery:${params.sessionKey}`,
138+
params: {
139+
to,
140+
threadId: deliveryContext?.threadId ?? params.entry.lastThreadId ?? origin?.threadId,
141+
message: "That run was interrupted before it could finish. Please send it again.",
142+
},
143+
},
144+
}),
145+
);
146+
return true;
147+
} catch (err) {
148+
diag.warn(
149+
`stale reply turn recovery fallback failed: sessionKey=${params.sessionKey} err=${String(err)}`,
150+
);
151+
return false;
152+
}
153+
}
154+
155+
async function closeStaleReplyTurnIfNeeded(params: { sessionKey?: string }): Promise<boolean> {
156+
const found = findReplyTurnSessionEntry({ sessionKey: params.sessionKey });
157+
if (!found || !params.sessionKey || !shouldCloseStaleReplyTurn(found.entry)) {
158+
return false;
159+
}
160+
const sent = await sendRecoveryFallback({ sessionKey: params.sessionKey, entry: found.entry });
161+
await updateSessionStore(found.storePath, async (store) => {
162+
const entry = store[params.sessionKey!];
163+
if (!entry || entry.replyTurnState !== "running") {
164+
return store;
165+
}
166+
store[params.sessionKey!] = {
167+
...entry,
168+
replyTurnState: "failed",
169+
replyTurnUpdatedAt: Date.now(),
170+
replyTurnLastError: sent ? "recovered_after_restart" : "recovery_fallback_delivery_failed",
171+
updatedAt: Date.now(),
172+
};
173+
return store;
174+
});
175+
diag.warn(
176+
`stale reply turn recovery closed: sessionKey=${params.sessionKey} fallbackSent=${sent}`,
177+
);
178+
return true;
179+
}
180+
56181
export async function recoverStuckDiagnosticSession(
57182
params: StuckSessionRecoveryParams,
58183
): Promise<StuckSessionRecoveryOutcome> {
@@ -144,6 +269,23 @@ export async function recoverStuckDiagnosticSession(
144269
return outcome;
145270
}
146271

272+
const closedStaleReplyTurn = await closeStaleReplyTurnIfNeeded({
273+
sessionKey: params.sessionKey,
274+
});
275+
if (closedStaleReplyTurn) {
276+
const outcome: StuckSessionRecoveryOutcome = {
277+
status: "released",
278+
action: "release_lane",
279+
reason: "stale_reply_turn_closed",
280+
sessionId: params.sessionId,
281+
sessionKey: params.sessionKey,
282+
released: 0,
283+
lane: sessionLane ?? undefined,
284+
};
285+
diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`);
286+
return outcome;
287+
}
288+
147289
if (!activeSessionId && sessionLane) {
148290
const laneSnapshot = getCommandLaneSnapshot(sessionLane);
149291
if (laneSnapshot.activeCount > 0) {
@@ -232,6 +374,7 @@ export async function recoverStuckDiagnosticSession(
232374
}
233375
}
234376

377+
// eslint-disable-next-line no-underscore-dangle -- established test hook for this runtime module.
235378
export const __testing = {
236379
resetRecoveriesInFlight(): void {
237380
recoveriesInFlight.clear();

0 commit comments

Comments
 (0)