Skip to content

Commit 2c48dd2

Browse files
fix(sessions): preserve corrupt-header transcripts
Fixes #89037. Co-authored-by: Charles <charles-openclaw@9bcfae.inboxapi.ai>
1 parent 4a285d5 commit 2c48dd2

4 files changed

Lines changed: 336 additions & 9 deletions

File tree

src/agents/embedded-agent-runner/session-manager-init.test.ts

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import fs from "node:fs/promises";
22
import os from "node:os";
33
import path from "node:path";
44
import { afterEach, describe, expect, it } from "vitest";
5+
import { SessionManager } from "../sessions/session-manager.js";
56
import { prepareSessionManagerForRun } from "./session-manager-init.js";
67

78
const tempPaths: string[] = [];
@@ -146,4 +147,119 @@ describe("prepareSessionManagerForRun", () => {
146147
);
147148
expect(JSON.parse(assistantLine ?? "{}")).toEqual(assistantEntry);
148149
});
150+
151+
it("does not truncate an existing transcript with a corrupted header", async () => {
152+
const sessionFile = await makeTempFile();
153+
const originalTranscript =
154+
[
155+
'{"type":"session","id":"broken"',
156+
JSON.stringify({
157+
type: "message",
158+
id: "user-1",
159+
parentId: null,
160+
timestamp: "2026-05-27T00:00:01.000Z",
161+
message: { role: "user", content: "persisted prompt" },
162+
}),
163+
].join("\n") + "\n";
164+
await fs.writeFile(sessionFile, originalTranscript, "utf-8");
165+
const sessionManager = {
166+
sessionId: "fresh-session",
167+
cwd: "/srv/openclaw/main",
168+
flushed: true,
169+
fileEntries: [
170+
{
171+
type: "session",
172+
id: "fresh-session",
173+
cwd: "/srv/openclaw/main",
174+
},
175+
{
176+
type: "message",
177+
message: { role: "user" },
178+
},
179+
],
180+
byId: new Map([["user-1", {}]]),
181+
labelsById: new Map(),
182+
leafId: "user-1",
183+
};
184+
185+
await expect(
186+
prepareSessionManagerForRun({
187+
sessionManager,
188+
sessionFile,
189+
hadSessionFile: true,
190+
sessionId: "new-session",
191+
cwd: "/tmp/task-repo",
192+
}),
193+
).rejects.toThrow("Refusing to reset session transcript with unreadable header");
194+
195+
expect(await fs.readFile(sessionFile, "utf-8")).toBe(originalTranscript);
196+
expect(sessionManager.fileEntries).toEqual([
197+
{
198+
type: "session",
199+
id: "fresh-session",
200+
cwd: "/srv/openclaw/main",
201+
},
202+
{
203+
type: "message",
204+
message: { role: "user" },
205+
},
206+
]);
207+
expect(sessionManager.flushed).toBe(true);
208+
});
209+
210+
it("keeps recovered user-only transcripts through open and run preparation", async () => {
211+
const sessionFile = await makeTempFile();
212+
const userEntry = {
213+
type: "message",
214+
id: "user-1",
215+
parentId: null,
216+
timestamp: "2026-05-27T00:00:01.000Z",
217+
message: { role: "user", content: "persisted prompt" },
218+
};
219+
await fs.writeFile(
220+
sessionFile,
221+
['{"type":"session","id":"broken"', JSON.stringify(userEntry)].join("\n") + "\n",
222+
"utf-8",
223+
);
224+
225+
const sessionManager = SessionManager.open(sessionFile, path.dirname(sessionFile), "/old/cwd");
226+
227+
await prepareSessionManagerForRun({
228+
sessionManager,
229+
sessionFile,
230+
hadSessionFile: true,
231+
sessionId: "new-session",
232+
cwd: "/tmp/task-repo",
233+
});
234+
sessionManager.appendMessage({
235+
role: "assistant",
236+
content: [{ type: "text", text: "response" }],
237+
api: "messages",
238+
provider: "anthropic",
239+
model: "sonnet-4.6",
240+
usage: {
241+
input: 0,
242+
output: 0,
243+
cacheRead: 0,
244+
cacheWrite: 0,
245+
totalTokens: 0,
246+
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
247+
},
248+
stopReason: "stop",
249+
timestamp: Date.now(),
250+
});
251+
252+
const entries = (await fs.readFile(sessionFile, "utf-8"))
253+
.trim()
254+
.split("\n")
255+
.map(
256+
(line) => JSON.parse(line) as { type: string; id?: string; message?: { role?: string } },
257+
);
258+
expect(entries.map((entry) => entry.type)).toEqual(["session", "message", "message"]);
259+
expect(entries[0]).toEqual(
260+
expect.objectContaining({ type: "session", id: "new-session", cwd: "/tmp/task-repo" }),
261+
);
262+
expect(entries[1]).toEqual(userEntry);
263+
expect(entries[2]?.message?.role).toBe("assistant");
264+
});
149265
});

src/agents/embedded-agent-runner/session-manager-init.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,30 @@ import { serializeJsonlLine, writeJsonlLines } from "../../config/sessions/trans
44
type SessionHeaderEntry = { type: "session"; id?: string; cwd?: string };
55
type SessionMessageEntry = { type: "message"; message?: { role?: string } };
66

7+
function isRecord(value: unknown): value is Record<string, unknown> {
8+
return typeof value === "object" && value !== null && !Array.isArray(value);
9+
}
10+
11+
async function assertExistingHeaderIsReadable(sessionFile: string): Promise<void> {
12+
const content = await fs.readFile(sessionFile, "utf-8");
13+
const firstLine = content.split("\n").find((line) => line.trim());
14+
if (!firstLine) {
15+
return;
16+
}
17+
18+
let parsed: unknown;
19+
try {
20+
parsed = JSON.parse(firstLine);
21+
} catch (error) {
22+
throw new Error(`Refusing to reset session transcript with unreadable header: ${sessionFile}`, {
23+
cause: error,
24+
});
25+
}
26+
if (!isRecord(parsed) || parsed.type !== "session") {
27+
throw new Error(`Refusing to reset session transcript with invalid header: ${sessionFile}`);
28+
}
29+
}
30+
731
/**
832
* session runtime SessionManager persistence quirk:
933
* - If the file exists but has no assistant message, SessionManager marks itself `flushed=true`
@@ -29,6 +53,7 @@ export async function prepareSessionManagerForRun(params: {
2953
byId?: Map<string, unknown>;
3054
labelsById?: Map<string, unknown>;
3155
leafId?: string | null;
56+
wasRecoveredFromCorruptHeader?: () => boolean;
3257
};
3358

3459
const header = sm.fileEntries.find((e): e is SessionHeaderEntry => e.type === "session");
@@ -45,7 +70,20 @@ export async function prepareSessionManagerForRun(params: {
4570
}
4671

4772
if (params.hadSessionFile && header && !hasAssistant) {
73+
if (sm.wasRecoveredFromCorruptHeader?.()) {
74+
header.id = params.sessionId;
75+
header.cwd = params.cwd;
76+
sm.sessionId = params.sessionId;
77+
sm.cwd = params.cwd;
78+
await writeJsonlLines(params.sessionFile, sm.fileEntries.map(serializeJsonlLine), {
79+
mode: 0o600,
80+
});
81+
sm.flushed = true;
82+
return;
83+
}
84+
4885
// Reset file so the first assistant flush includes header+user+assistant in order.
86+
await assertExistingHeaderIsReadable(params.sessionFile);
4987
await fs.writeFile(params.sessionFile, "", "utf-8");
5088
header.id = params.sessionId;
5189
header.cwd = params.cwd;
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import fs from "node:fs/promises";
2+
import os from "node:os";
3+
import path from "node:path";
4+
import { afterEach, describe, expect, it } from "vitest";
5+
import { SessionManager } from "./session-manager.js";
6+
7+
const tempPaths: string[] = [];
8+
9+
async function makeTempDir(): Promise<string> {
10+
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-manager-"));
11+
tempPaths.push(dir);
12+
return dir;
13+
}
14+
15+
describe("SessionManager.open", () => {
16+
afterEach(async () => {
17+
await Promise.all(
18+
tempPaths.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })),
19+
);
20+
});
21+
22+
it("recovers a corrupted first-line header without truncating later messages", async () => {
23+
const dir = await makeTempDir();
24+
const sessionFile = path.join(dir, "session.jsonl");
25+
const originalHeader = {
26+
type: "session",
27+
version: 3,
28+
id: "original-session",
29+
timestamp: "2026-05-27T00:00:00.000Z",
30+
cwd: "/srv/openclaw/main",
31+
};
32+
const userEntry = {
33+
type: "message",
34+
id: "user-1",
35+
parentId: null,
36+
timestamp: "2026-05-27T00:00:01.000Z",
37+
message: { role: "user", content: "important question" },
38+
};
39+
const assistantEntry = {
40+
type: "message",
41+
id: "assistant-1",
42+
parentId: "user-1",
43+
timestamp: "2026-05-27T00:00:02.000Z",
44+
message: { role: "assistant", content: "important answer" },
45+
};
46+
const originalTranscript =
47+
[
48+
JSON.stringify(originalHeader).slice(0, 30),
49+
JSON.stringify(userEntry),
50+
JSON.stringify(assistantEntry),
51+
].join("\n") + "\n";
52+
await fs.writeFile(sessionFile, originalTranscript, "utf8");
53+
if (process.platform !== "win32") {
54+
await fs.chmod(sessionFile, 0o600);
55+
}
56+
57+
const sessionManager = SessionManager.open(sessionFile, dir, "/tmp/task-repo");
58+
59+
expect(sessionManager.getEntries()).toEqual([userEntry, assistantEntry]);
60+
expect(await fs.readFile(sessionFile, "utf8")).toContain("important question");
61+
expect(await fs.readFile(sessionFile, "utf8")).toContain("important answer");
62+
await expect(fs.readFile(sessionFile, "utf8")).resolves.not.toBe(originalTranscript);
63+
64+
const backupFiles = (await fs.readdir(dir)).filter((file) => file.includes(".corrupt-"));
65+
expect(backupFiles).toHaveLength(1);
66+
await expect(fs.readFile(path.join(dir, backupFiles[0] ?? ""), "utf8")).resolves.toBe(
67+
originalTranscript,
68+
);
69+
if (process.platform !== "win32") {
70+
const backupStat = await fs.stat(path.join(dir, backupFiles[0] ?? ""));
71+
expect(backupStat.mode & 0o777).toBe(0o600);
72+
}
73+
});
74+
75+
it("does not duplicate the header after recovering a header-only corrupt file", async () => {
76+
const dir = await makeTempDir();
77+
const sessionFile = path.join(dir, "session.jsonl");
78+
await fs.writeFile(sessionFile, '{"type":"session","version":3,"id":"sess', "utf8");
79+
80+
const sessionManager = SessionManager.open(sessionFile, dir, "/tmp/task-repo");
81+
sessionManager.appendMessage({ role: "user", content: "hello", timestamp: Date.now() });
82+
sessionManager.appendMessage({
83+
role: "assistant",
84+
content: [{ type: "text", text: "hi" }],
85+
api: "messages",
86+
provider: "anthropic",
87+
model: "sonnet-4.6",
88+
usage: {
89+
input: 0,
90+
output: 0,
91+
cacheRead: 0,
92+
cacheWrite: 0,
93+
totalTokens: 0,
94+
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
95+
},
96+
stopReason: "stop",
97+
timestamp: Date.now(),
98+
});
99+
100+
const entries = (await fs.readFile(sessionFile, "utf8"))
101+
.trim()
102+
.split("\n")
103+
.map((line) => JSON.parse(line) as { type: string });
104+
105+
expect(entries.map((entry) => entry.type)).toEqual(["session", "message", "message"]);
106+
expect(entries.filter((entry) => entry.type === "session")).toHaveLength(1);
107+
});
108+
});

0 commit comments

Comments
 (0)