Skip to content

Commit 6147e1b

Browse files
authored
fix(gateway): async session transcript IO (#75875)
* fix(gateway): async session transcript IO * fix(plugins): restore jiti loader cache helper * test(gateway): mock async artifact transcript reads * chore(plugins): drop obsolete jiti loader shim
1 parent 8d7f4d2 commit 6147e1b

37 files changed

Lines changed: 1890 additions & 338 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai
1515

1616
### Fixes
1717

18+
- Gateway/sessions: move hot transcript reads and mirror appends onto async bounded IO with serialized parent-linked writes, keeping large session histories from stalling Gateway requests and channel replies. Fixes #75656. Thanks @DerFlash.
1819
- Doctor/WhatsApp: warn when Linux crontabs still run the legacy `ensure-whatsapp.sh` health check, which can misreport `Gateway inactive` when cron lacks the systemd user-bus environment. Fixes #60204. Thanks @mySebbe.
1920
- Slack/setup: print the generated app manifest as plain JSON instead of embedding it inside the framed setup note, so it can be copied into Slack without deleting border characters. Fixes #65751. Thanks @theDanielJLewis.
2021
- Channels/WhatsApp: route CLI logout through the live Gateway and stop runtime-backed listeners before channel removal, so removing a WhatsApp account does not leave the old socket replying until restart. Fixes #67746. Thanks @123Mismail.

extensions/codex/src/app-server/transcript-mirror.test.ts

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ async function createTempSessionFile() {
2929
return path.join(dir, "session.jsonl");
3030
}
3131

32+
async function makeRoot(prefix: string): Promise<string> {
33+
const root = await fs.mkdtemp(path.join(os.tmpdir(), prefix));
34+
tempDirs.push(root);
35+
return root;
36+
}
37+
3238
describe("mirrorCodexAppServerTranscript", () => {
3339
it("mirrors user and assistant messages into the Pi transcript", async () => {
3440
const sessionFile = await createTempSessionFile();
@@ -58,6 +64,27 @@ describe("mirrorCodexAppServerTranscript", () => {
5864
expect(raw).toContain('"idempotencyKey":"scope-1:assistant:1"');
5965
});
6066

67+
it("creates the transcript directory on first mirror", async () => {
68+
const root = await makeRoot("openclaw-codex-transcript-missing-dir-");
69+
const sessionFile = path.join(root, "nested", "sessions", "session.jsonl");
70+
71+
await mirrorCodexAppServerTranscript({
72+
sessionFile,
73+
sessionKey: "session-1",
74+
messages: [
75+
makeAgentAssistantMessage({
76+
content: [{ type: "text", text: "first mirror" }],
77+
timestamp: Date.now(),
78+
}),
79+
],
80+
idempotencyScope: "scope-1",
81+
});
82+
83+
const raw = await fs.readFile(sessionFile, "utf8");
84+
expect(raw).toContain('"role":"assistant"');
85+
expect(raw).toContain('"content":[{"type":"text","text":"first mirror"}]');
86+
});
87+
6188
it("deduplicates app-server turn mirrors by idempotency scope", async () => {
6289
const sessionFile = await createTempSessionFile();
6390
const messages = [
@@ -183,4 +210,56 @@ describe("mirrorCodexAppServerTranscript", () => {
183210

184211
await expect(fs.readFile(sessionFile, "utf8")).rejects.toMatchObject({ code: "ENOENT" });
185212
});
213+
214+
it("migrates small linear transcripts before mirroring", async () => {
215+
const sessionFile = await createTempSessionFile();
216+
await fs.writeFile(
217+
sessionFile,
218+
[
219+
JSON.stringify({
220+
type: "session",
221+
version: 3,
222+
id: "linear-codex-session",
223+
timestamp: new Date().toISOString(),
224+
cwd: process.cwd(),
225+
}),
226+
JSON.stringify({
227+
type: "message",
228+
id: "legacy-user",
229+
timestamp: new Date().toISOString(),
230+
message: { role: "user", content: "legacy user" },
231+
}),
232+
].join("\n") + "\n",
233+
"utf8",
234+
);
235+
236+
await mirrorCodexAppServerTranscript({
237+
sessionFile,
238+
sessionKey: "session-1",
239+
messages: [
240+
makeAgentAssistantMessage({
241+
content: [{ type: "text", text: "mirrored assistant" }],
242+
timestamp: Date.now(),
243+
}),
244+
],
245+
idempotencyScope: "scope-1",
246+
});
247+
248+
const records = (await fs.readFile(sessionFile, "utf8"))
249+
.trim()
250+
.split("\n")
251+
.map(
252+
(line) =>
253+
JSON.parse(line) as {
254+
type?: string;
255+
id?: string;
256+
parentId?: string | null;
257+
message?: { role?: string };
258+
},
259+
)
260+
.filter((record) => record.type === "message");
261+
262+
expect(records[0]).toMatchObject({ id: "legacy-user", parentId: null });
263+
expect(records[1]).toMatchObject({ parentId: "legacy-user" });
264+
});
186265
});

extensions/codex/src/app-server/transcript-mirror.ts

Lines changed: 212 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
1+
import { randomUUID } from "node:crypto";
12
import fs from "node:fs/promises";
23
import path from "node:path";
3-
import { SessionManager } from "@mariozechner/pi-coding-agent";
4+
import { StringDecoder } from "node:string_decoder";
5+
import { CURRENT_SESSION_VERSION, type SessionManager } from "@mariozechner/pi-coding-agent";
46
import {
57
acquireSessionWriteLock,
68
emitSessionTranscriptUpdate,
79
runAgentHarnessBeforeMessageWriteHook,
810
type AgentMessage,
911
} from "openclaw/plugin-sdk/agent-harness-runtime";
1012

13+
const TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES = 64 * 1024;
14+
const SESSION_MANAGER_APPEND_MAX_BYTES = 8 * 1024 * 1024;
15+
16+
type TranscriptLeafInfo = {
17+
leafId?: string;
18+
hasParentLinkedEntries: boolean;
19+
nonSessionEntryCount: number;
20+
};
21+
1122
export async function mirrorCodexAppServerTranscript(params: {
1223
sessionFile: string;
1324
sessionKey?: string;
@@ -29,7 +40,6 @@ export async function mirrorCodexAppServerTranscript(params: {
2940
});
3041
try {
3142
const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile);
32-
const sessionManager = SessionManager.open(params.sessionFile);
3343
for (const [index, message] of messages.entries()) {
3444
const idempotencyKey = params.idempotencyScope
3545
? `${params.idempotencyScope}:${message.role}:${index}`
@@ -55,7 +65,10 @@ export async function mirrorCodexAppServerTranscript(params: {
5565
idempotencyKey,
5666
}
5767
: nextMessage) as unknown as Parameters<SessionManager["appendMessage"]>[0];
58-
sessionManager.appendMessage(messageToAppend);
68+
await appendCodexAppServerTranscriptMessage({
69+
transcriptPath: params.sessionFile,
70+
message: messageToAppend,
71+
});
5972
if (idempotencyKey) {
6073
existingIdempotencyKeys.add(idempotencyKey);
6174
}
@@ -71,6 +84,202 @@ export async function mirrorCodexAppServerTranscript(params: {
7184
}
7285
}
7386

87+
async function appendCodexAppServerTranscriptMessage(params: {
88+
transcriptPath: string;
89+
message: unknown;
90+
}): Promise<void> {
91+
await ensureTranscriptHeader(params.transcriptPath);
92+
const stat = await fs.stat(params.transcriptPath).catch(() => null);
93+
let leafInfo: TranscriptLeafInfo = await readTranscriptLeafInfo(params.transcriptPath).catch(
94+
() => ({
95+
hasParentLinkedEntries: false,
96+
nonSessionEntryCount: 0,
97+
}),
98+
);
99+
const hasLinearEntries = !leafInfo.hasParentLinkedEntries && leafInfo.nonSessionEntryCount > 0;
100+
const shouldRawAppend = hasLinearEntries && (stat?.size ?? 0) > SESSION_MANAGER_APPEND_MAX_BYTES;
101+
if (hasLinearEntries && !shouldRawAppend) {
102+
const migrated = await migrateLinearTranscriptToParentLinked(params.transcriptPath);
103+
leafInfo = {
104+
...(migrated.leafId ? { leafId: migrated.leafId } : {}),
105+
hasParentLinkedEntries: Boolean(migrated.leafId),
106+
nonSessionEntryCount: leafInfo.nonSessionEntryCount,
107+
};
108+
}
109+
const entry = {
110+
type: "message",
111+
id: randomUUID(),
112+
...(shouldRawAppend ? {} : { parentId: leafInfo.leafId ?? null }),
113+
timestamp: new Date().toISOString(),
114+
message: params.message,
115+
};
116+
await fs.appendFile(params.transcriptPath, `${JSON.stringify(entry)}\n`, "utf-8");
117+
}
118+
119+
async function ensureTranscriptHeader(transcriptPath: string): Promise<void> {
120+
const stat = await fs.stat(transcriptPath).catch(() => null);
121+
if (stat?.isFile() && stat.size > 0) {
122+
return;
123+
}
124+
await fs.mkdir(path.dirname(transcriptPath), { recursive: true });
125+
const header = {
126+
type: "session",
127+
version: CURRENT_SESSION_VERSION,
128+
id: randomUUID(),
129+
timestamp: new Date().toISOString(),
130+
cwd: process.cwd(),
131+
};
132+
await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, {
133+
encoding: "utf-8",
134+
mode: 0o600,
135+
flag: stat?.isFile() ? "w" : "wx",
136+
});
137+
}
138+
139+
async function readTranscriptLeafInfo(transcriptPath: string): Promise<TranscriptLeafInfo> {
140+
const handle = await fs.open(transcriptPath, "r");
141+
try {
142+
const decoder = new StringDecoder("utf8");
143+
const buffer = Buffer.allocUnsafe(TRANSCRIPT_APPEND_SCAN_CHUNK_BYTES);
144+
let carry = "";
145+
let leafId: string | undefined;
146+
let hasParentLinkedEntries = false;
147+
let nonSessionEntryCount = 0;
148+
while (true) {
149+
const { bytesRead } = await handle.read(buffer, 0, buffer.length, null);
150+
if (bytesRead <= 0) {
151+
break;
152+
}
153+
const text = carry + decoder.write(buffer.subarray(0, bytesRead));
154+
const lines = text.split(/\r?\n/);
155+
carry = lines.pop() ?? "";
156+
for (const line of lines) {
157+
if (lineHasNonSessionEntry(line)) {
158+
nonSessionEntryCount += 1;
159+
}
160+
const id = lineParentLinkedEntryId(line);
161+
if (id) {
162+
leafId = id;
163+
hasParentLinkedEntries = true;
164+
}
165+
}
166+
await yieldTranscriptAppendScan();
167+
}
168+
const tail = carry + decoder.end();
169+
if (lineHasNonSessionEntry(tail)) {
170+
nonSessionEntryCount += 1;
171+
}
172+
const id = lineParentLinkedEntryId(tail);
173+
if (id) {
174+
leafId = id;
175+
hasParentLinkedEntries = true;
176+
}
177+
return {
178+
...(leafId ? { leafId } : {}),
179+
hasParentLinkedEntries,
180+
nonSessionEntryCount,
181+
};
182+
} finally {
183+
await handle.close();
184+
}
185+
}
186+
187+
async function migrateLinearTranscriptToParentLinked(transcriptPath: string): Promise<{
188+
leafId?: string;
189+
}> {
190+
const raw = await fs.readFile(transcriptPath, "utf-8");
191+
const existingIds = new Set<string>();
192+
const output: string[] = [];
193+
let previousId: string | null = null;
194+
let leafId: string | undefined;
195+
for (const line of raw.split(/\r?\n/)) {
196+
if (!line.trim()) {
197+
continue;
198+
}
199+
let parsed: unknown;
200+
try {
201+
parsed = JSON.parse(line);
202+
} catch {
203+
output.push(line);
204+
continue;
205+
}
206+
if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) {
207+
output.push(line);
208+
continue;
209+
}
210+
const record = parsed as Record<string, unknown>;
211+
if (record.type === "session") {
212+
output.push(JSON.stringify({ ...record, version: CURRENT_SESSION_VERSION }));
213+
continue;
214+
}
215+
const id = normalizeEntryId(record.id) ?? generateEntryId(existingIds);
216+
existingIds.add(id);
217+
record.id = id;
218+
if (!Object.hasOwn(record, "parentId")) {
219+
record.parentId = previousId;
220+
}
221+
previousId = id;
222+
leafId = id;
223+
output.push(JSON.stringify(record));
224+
}
225+
await fs.writeFile(transcriptPath, `${output.join("\n")}\n`, {
226+
encoding: "utf-8",
227+
mode: 0o600,
228+
});
229+
const result: { leafId?: string } = {};
230+
if (leafId) {
231+
result.leafId = leafId;
232+
}
233+
return result;
234+
}
235+
236+
function normalizeEntryId(value: unknown): string | undefined {
237+
return typeof value === "string" && value.trim().length > 0 ? value : undefined;
238+
}
239+
240+
function generateEntryId(existingIds: Set<string>): string {
241+
for (let attempt = 0; attempt < 100; attempt += 1) {
242+
const id = randomUUID().slice(0, 8);
243+
if (!existingIds.has(id)) {
244+
existingIds.add(id);
245+
return id;
246+
}
247+
}
248+
const id = randomUUID();
249+
existingIds.add(id);
250+
return id;
251+
}
252+
253+
function lineHasNonSessionEntry(line: string): boolean {
254+
if (!line.trim()) {
255+
return false;
256+
}
257+
try {
258+
const parsed = JSON.parse(line) as { type?: unknown };
259+
return parsed.type !== "session";
260+
} catch {
261+
return false;
262+
}
263+
}
264+
265+
function lineParentLinkedEntryId(line: string): string | undefined {
266+
if (!line.trim()) {
267+
return undefined;
268+
}
269+
try {
270+
const parsed = JSON.parse(line) as { type?: unknown; id?: unknown; parentId?: unknown };
271+
return parsed.type !== "session" && typeof parsed.id === "string" && "parentId" in parsed
272+
? parsed.id
273+
: undefined;
274+
} catch {
275+
return undefined;
276+
}
277+
}
278+
279+
async function yieldTranscriptAppendScan(): Promise<void> {
280+
await new Promise<void>((resolve) => setImmediate(resolve));
281+
}
282+
74283
async function readTranscriptIdempotencyKeys(sessionFile: string): Promise<Set<string>> {
75284
const keys = new Set<string>();
76285
let raw: string;

src/agents/main-session-restart-recovery.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import path from "node:path";
77
import { resolveStateDir } from "../config/paths.js";
88
import { type SessionEntry, loadSessionStore, updateSessionStore } from "../config/sessions.js";
99
import { callGateway } from "../gateway/call.js";
10-
import { readSessionMessages } from "../gateway/session-utils.fs.js";
10+
import { readSessionMessagesAsync } from "../gateway/session-utils.fs.js";
1111
import { createSubsystemLogger } from "../logging/subsystem.js";
1212
import { CommandLane } from "../process/lanes.js";
1313
import { isAcpSessionKey, isCronSessionKey, isSubagentSessionKey } from "../routing/session-key.js";
@@ -226,7 +226,11 @@ async function recoverStore(params: {
226226

227227
let messages: unknown[];
228228
try {
229-
messages = readSessionMessages(entry.sessionId, params.storePath, entry.sessionFile);
229+
messages = await readSessionMessagesAsync(
230+
entry.sessionId,
231+
params.storePath,
232+
entry.sessionFile,
233+
);
230234
} catch (err) {
231235
log.warn(`failed to read transcript for ${sessionKey}: ${String(err)}`);
232236
result.failed++;

0 commit comments

Comments
 (0)