Skip to content

Commit b1eedb2

Browse files
authored
Add ACP session load event ledger (#79093)
* Add ACP session load event ledger * Record ACP prompts after send acceptance * Support ACP ledger replay by session key * Harden ACP ledger replay completeness * Harden ACP ledger review gaps * Fix ACP canonical session key handling --------- Co-authored-by: Alex Knight <15041791+amknight@users.noreply.github.com>
1 parent 252456e commit b1eedb2

10 files changed

Lines changed: 1643 additions & 49 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai
4242
- ACPX/Codex: preserve trusted Codex project declarations when launching isolated Codex ACP sessions, avoiding interactive trust prompts in headless runs. Thanks @Stedyclaw.
4343
- ACPX/Codex: reap stale OpenClaw-owned ACPX/Codex ACP process trees on startup and after ACP session close, preventing orphaned harness processes from slowing the Gateway. Thanks @91wan.
4444
- ACP bridge: implement stable session list, resume, and close handlers so ACP clients can page Gateway sessions, rebind existing sessions without replay, and close bridge sessions cleanly. Thanks @amknight.
45+
- ACP bridge: replay complete ledger-backed ACP sessions on load, including user prompts, tool updates, session metadata, and usage snapshots, while keeping older sessions on the existing transcript fallback. Thanks @amknight.
4546
- ACP sessions: allow parent agents to inspect and message their own spawned cross-agent ACP sessions without enabling broad agent-to-agent visibility. Thanks @barronlroth.
4647
- Talk/voice: unify realtime relay, transcription relay, managed-room handoff, Voice Call, Google Meet, VoiceClaw, and native clients around a shared Talk session controller and add the Gateway-managed `talk.session.*` RPC surface.
4748
- Diagnostics/Talk: export bounded Talk lifecycle/audio metrics and session recovery metrics through OpenTelemetry and Prometheus without exposing transcripts, audio payloads, room ids, turn ids, or session ids.

docs/cli/acp.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ Quick rule:
4444
| `initialize`, `newSession`, `prompt`, `cancel` | Implemented | Core bridge flow over stdio to Gateway chat/send + abort. |
4545
| `listSessions`, slash commands | Implemented | Session list works against Gateway session state with bounded cursor pagination and `cwd` filtering where Gateway session rows carry workspace metadata; commands are advertised via `available_commands_update`. |
4646
| `resumeSession`, `closeSession` | Implemented | Resume rebinds an ACP session to an existing Gateway session without replaying history. Close cancels active bridge work, resolves pending prompts as cancelled, and releases bridge session state. |
47-
| `loadSession` | Partial | Rebinds the ACP session to a Gateway session key and replays stored user/assistant text history. Tool/system history is not reconstructed yet. |
47+
| `loadSession` | Partial | Rebinds the ACP session to a Gateway session key and replays ACP event-ledger history for bridge-created sessions. Older/no-ledger sessions fall back to stored user/assistant text. |
4848
| Prompt content (`text`, embedded `resource`, images) | Partial | Text/resources are flattened into chat input; images become Gateway attachments. |
4949
| Session modes | Partial | `session/set_mode` is supported and the bridge exposes initial Gateway-backed session controls for thought level, tool verbosity, reasoning, usage detail, and elevated actions. Broader ACP-native mode/config surfaces are still out of scope. |
5050
| Session info and usage updates | Partial | The bridge emits `session_info_update` and best-effort `usage_update` notifications from cached Gateway session snapshots. Usage is approximate and only sent when Gateway token totals are marked fresh. |
@@ -56,9 +56,9 @@ Quick rule:
5656

5757
## Known Limitations
5858

59-
- `loadSession` replays stored user and assistant text history, but it does not
60-
reconstruct historic tool calls, system notices, or richer ACP-native event
61-
types.
59+
- `loadSession` can replay complete ACP event-ledger history only for
60+
bridge-created sessions. Older/no-ledger sessions still use transcript
61+
fallback and do not reconstruct historic tool calls or system notices.
6262
- If multiple ACP clients share the same Gateway session key, event and cancel
6363
routing are best-effort rather than strictly isolated per client. Prefer the
6464
default isolated `acp:<uuid>` sessions when you need clean editor-local

src/acp/event-ledger.test.ts

Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
import fs from "node:fs/promises";
2+
import path from "node:path";
3+
import { describe, expect, it } from "vitest";
4+
import { withTempDir } from "../test-helpers/temp-dir.js";
5+
import { createFileAcpEventLedger, createInMemoryAcpEventLedger } from "./event-ledger.js";
6+
7+
describe("ACP event ledger", () => {
8+
it("records complete in-memory session updates in sequence", async () => {
9+
const ledger = createInMemoryAcpEventLedger({ now: () => 123 });
10+
await ledger.startSession({
11+
sessionId: "session-1",
12+
sessionKey: "agent:main:work",
13+
cwd: "/work",
14+
complete: true,
15+
});
16+
await ledger.recordUserPrompt({
17+
sessionId: "session-1",
18+
sessionKey: "agent:main:work",
19+
runId: "run-1",
20+
prompt: [{ type: "text", text: "Question" }],
21+
});
22+
await ledger.recordUpdate({
23+
sessionId: "session-1",
24+
sessionKey: "agent:main:work",
25+
runId: "run-1",
26+
update: {
27+
sessionUpdate: "agent_message_chunk",
28+
content: { type: "text", text: "Answer" },
29+
},
30+
});
31+
32+
const replay = await ledger.readReplay({
33+
sessionId: "session-1",
34+
sessionKey: "agent:main:work",
35+
});
36+
37+
expect(replay.complete).toBe(true);
38+
expect(replay.events.map((event) => event.seq)).toEqual([1, 2]);
39+
expect(replay.events.map((event) => event.runId)).toEqual(["run-1", "run-1"]);
40+
expect(replay.events.map((event) => event.update.sessionUpdate)).toEqual([
41+
"user_message_chunk",
42+
"agent_message_chunk",
43+
]);
44+
});
45+
46+
it("marks a session incomplete when event retention truncates history", async () => {
47+
const ledger = createInMemoryAcpEventLedger({ maxEventsPerSession: 1 });
48+
await ledger.startSession({
49+
sessionId: "session-1",
50+
sessionKey: "agent:main:work",
51+
cwd: "/work",
52+
complete: true,
53+
});
54+
await ledger.recordUpdate({
55+
sessionId: "session-1",
56+
sessionKey: "agent:main:work",
57+
update: {
58+
sessionUpdate: "agent_message_chunk",
59+
content: { type: "text", text: "First" },
60+
},
61+
});
62+
await ledger.recordUpdate({
63+
sessionId: "session-1",
64+
sessionKey: "agent:main:work",
65+
update: {
66+
sessionUpdate: "agent_message_chunk",
67+
content: { type: "text", text: "Second" },
68+
},
69+
});
70+
71+
await expect(
72+
ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }),
73+
).resolves.toEqual({ complete: false, events: [] });
74+
});
75+
76+
it("persists file-backed replay state across ledger instances", async () => {
77+
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
78+
const filePath = path.join(dir, "acp", "event-ledger.json");
79+
const first = createFileAcpEventLedger({ filePath, now: () => 1000 });
80+
await first.startSession({
81+
sessionId: "session-1",
82+
sessionKey: "agent:main:work",
83+
cwd: "/work",
84+
complete: true,
85+
});
86+
await first.recordUpdate({
87+
sessionId: "session-1",
88+
sessionKey: "agent:main:work",
89+
runId: "run-1",
90+
update: {
91+
sessionUpdate: "agent_thought_chunk",
92+
content: { type: "text", text: "Thinking" },
93+
},
94+
});
95+
96+
const second = createFileAcpEventLedger({ filePath });
97+
const replay = await second.readReplay({
98+
sessionId: "session-1",
99+
sessionKey: "agent:main:work",
100+
});
101+
102+
expect(replay.complete).toBe(true);
103+
expect(replay.events).toHaveLength(1);
104+
expect(replay.events[0]?.update).toEqual({
105+
sessionUpdate: "agent_thought_chunk",
106+
content: { type: "text", text: "Thinking" },
107+
});
108+
await expect(fs.readFile(filePath, "utf8")).resolves.toContain('"version":1');
109+
});
110+
});
111+
112+
it("can replay a complete session by Gateway session key", async () => {
113+
const ledger = createInMemoryAcpEventLedger({ now: () => 1000 });
114+
await ledger.startSession({
115+
sessionId: "acp-session-1",
116+
sessionKey: "acp:gateway-session-1",
117+
cwd: "/work",
118+
complete: true,
119+
});
120+
await ledger.recordUpdate({
121+
sessionId: "acp-session-1",
122+
sessionKey: "acp:gateway-session-1",
123+
update: {
124+
sessionUpdate: "agent_message_chunk",
125+
content: { type: "text", text: "Answer" },
126+
},
127+
});
128+
129+
const replay = await ledger.readReplayBySessionKey({
130+
sessionKey: "acp:gateway-session-1",
131+
});
132+
133+
expect(replay.complete).toBe(true);
134+
expect(replay.sessionId).toBe("acp-session-1");
135+
expect(replay.sessionKey).toBe("acp:gateway-session-1");
136+
expect(replay.events.map((event) => event.update.sessionUpdate)).toEqual([
137+
"agent_message_chunk",
138+
]);
139+
});
140+
141+
it("preserves prompt history when a provisional ACP key becomes a canonical Gateway key", async () => {
142+
const ledger = createInMemoryAcpEventLedger({ now: () => 1000 });
143+
await ledger.startSession({
144+
sessionId: "acp-session-1",
145+
sessionKey: "acp:gateway-session-1",
146+
cwd: "/work",
147+
complete: true,
148+
});
149+
await ledger.recordUserPrompt({
150+
sessionId: "acp-session-1",
151+
sessionKey: "acp:gateway-session-1",
152+
runId: "run-1",
153+
prompt: [{ type: "text", text: "Question" }],
154+
});
155+
await ledger.recordUpdate({
156+
sessionId: "acp-session-1",
157+
sessionKey: "agent:main:acp:gateway-session-1",
158+
runId: "run-1",
159+
update: {
160+
sessionUpdate: "agent_message_chunk",
161+
content: { type: "text", text: "Answer" },
162+
},
163+
});
164+
165+
const replay = await ledger.readReplayBySessionKey({
166+
sessionKey: "agent:main:acp:gateway-session-1",
167+
});
168+
169+
expect(replay.complete).toBe(true);
170+
expect(replay.sessionId).toBe("acp-session-1");
171+
expect(replay.sessionKey).toBe("agent:main:acp:gateway-session-1");
172+
expect(replay.events.map((event) => event.update.sessionUpdate)).toEqual([
173+
"user_message_chunk",
174+
"agent_message_chunk",
175+
]);
176+
});
177+
178+
it("can replay multi-block prompt history by ACP session id", async () => {
179+
const ledger = createInMemoryAcpEventLedger({ now: () => 1000 });
180+
await ledger.startSession({
181+
sessionId: "acp-session-1",
182+
sessionKey: "acp:gateway-session-1",
183+
cwd: "/work",
184+
complete: true,
185+
});
186+
await ledger.recordUserPrompt({
187+
sessionId: "acp-session-1",
188+
sessionKey: "acp:gateway-session-1",
189+
runId: "run-1",
190+
prompt: [
191+
{ type: "text", text: "First" },
192+
{ type: "text", text: "Second" },
193+
],
194+
});
195+
196+
const replay = await ledger.readReplayBySessionId({ sessionId: "acp-session-1" });
197+
198+
expect(replay.complete).toBe(true);
199+
expect(replay.sessionKey).toBe("acp:gateway-session-1");
200+
expect(
201+
replay.events.map((event) =>
202+
event.update.sessionUpdate === "user_message_chunk" ? event.update.content : undefined,
203+
),
204+
).toEqual([
205+
{ type: "text", text: "First" },
206+
{ type: "text", text: "Second" },
207+
]);
208+
});
209+
210+
it("evicts the oldest complete session when session retention is exceeded", async () => {
211+
let now = 1000;
212+
const ledger = createInMemoryAcpEventLedger({ maxSessions: 1, now: () => now++ });
213+
await ledger.startSession({
214+
sessionId: "old-session",
215+
sessionKey: "acp:old-gateway-session",
216+
cwd: "/work",
217+
complete: true,
218+
});
219+
await ledger.startSession({
220+
sessionId: "new-session",
221+
sessionKey: "acp:new-gateway-session",
222+
cwd: "/work",
223+
complete: true,
224+
});
225+
226+
await expect(
227+
ledger.readReplay({ sessionId: "old-session", sessionKey: "acp:old-gateway-session" }),
228+
).resolves.toEqual({ complete: false, events: [] });
229+
const replay = await ledger.readReplayBySessionId({ sessionId: "new-session" });
230+
expect(replay.complete).toBe(true);
231+
expect(replay.sessionKey).toBe("acp:new-gateway-session");
232+
});
233+
234+
it("resets stale events when a session is restarted with reset", async () => {
235+
const ledger = createInMemoryAcpEventLedger();
236+
await ledger.startSession({
237+
sessionId: "session-1",
238+
sessionKey: "acp:old-session",
239+
cwd: "/work",
240+
complete: true,
241+
});
242+
await ledger.recordUpdate({
243+
sessionId: "session-1",
244+
sessionKey: "acp:old-session",
245+
update: {
246+
sessionUpdate: "agent_message_chunk",
247+
content: { type: "text", text: "Old answer" },
248+
},
249+
});
250+
await ledger.startSession({
251+
sessionId: "session-1",
252+
sessionKey: "acp:new-session",
253+
cwd: "/work",
254+
complete: true,
255+
reset: true,
256+
});
257+
258+
await expect(
259+
ledger.readReplay({ sessionId: "session-1", sessionKey: "acp:old-session" }),
260+
).resolves.toEqual({ complete: false, events: [] });
261+
await expect(ledger.readReplayBySessionId({ sessionId: "session-1" })).resolves.toMatchObject({
262+
complete: true,
263+
sessionKey: "acp:new-session",
264+
events: [],
265+
});
266+
});
267+
268+
it("marks replay incomplete when serialized byte retention trims payloads", async () => {
269+
const ledger = createInMemoryAcpEventLedger({ maxSerializedBytes: 900 });
270+
await ledger.startSession({
271+
sessionId: "session-1",
272+
sessionKey: "agent:main:work",
273+
cwd: "/work",
274+
complete: true,
275+
});
276+
await ledger.recordUpdate({
277+
sessionId: "session-1",
278+
sessionKey: "agent:main:work",
279+
update: {
280+
sessionUpdate: "tool_call_update",
281+
toolCallId: "tool-1",
282+
status: "completed",
283+
rawOutput: { content: "x".repeat(5_000) },
284+
},
285+
});
286+
287+
await expect(
288+
ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }),
289+
).resolves.toEqual({ complete: false, events: [] });
290+
});
291+
292+
it("keeps the persisted ledger file under the serialized byte budget", async () => {
293+
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
294+
const filePath = path.join(dir, "acp", "event-ledger.json");
295+
const ledger = createFileAcpEventLedger({ filePath, maxSerializedBytes: 1024 });
296+
await ledger.startSession({
297+
sessionId: "session-1",
298+
sessionKey: "agent:main:work",
299+
cwd: "/work",
300+
complete: true,
301+
});
302+
await ledger.recordUpdate({
303+
sessionId: "session-1",
304+
sessionKey: "agent:main:work",
305+
update: {
306+
sessionUpdate: "tool_call_update",
307+
toolCallId: "tool-1",
308+
status: "completed",
309+
rawOutput: { content: "x".repeat(5_000) },
310+
},
311+
});
312+
313+
const bytes = Buffer.byteLength(await fs.readFile(filePath, "utf8"), "utf8");
314+
expect(bytes).toBeLessThanOrEqual(1024);
315+
await expect(
316+
ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }),
317+
).resolves.toEqual({ complete: false, events: [] });
318+
});
319+
});
320+
321+
it("ignores corrupt ledger files instead of replaying unknown state", async () => {
322+
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
323+
const filePath = path.join(dir, "event-ledger.json");
324+
await fs.writeFile(filePath, "{bad json", "utf8");
325+
const ledger = createFileAcpEventLedger({ filePath });
326+
327+
await expect(
328+
ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }),
329+
).resolves.toEqual({ complete: false, events: [] });
330+
});
331+
});
332+
333+
it("reloads file-backed state under lock before writing", async () => {
334+
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
335+
const filePath = path.join(dir, "acp", "event-ledger.json");
336+
const first = createFileAcpEventLedger({ filePath });
337+
const second = createFileAcpEventLedger({ filePath });
338+
339+
await first.startSession({
340+
sessionId: "session-1",
341+
sessionKey: "acp:gateway-session-1",
342+
cwd: "/work",
343+
complete: true,
344+
});
345+
await second.startSession({
346+
sessionId: "session-2",
347+
sessionKey: "acp:gateway-session-2",
348+
cwd: "/work",
349+
complete: true,
350+
});
351+
await first.recordUpdate({
352+
sessionId: "session-1",
353+
sessionKey: "acp:gateway-session-1",
354+
update: {
355+
sessionUpdate: "agent_message_chunk",
356+
content: { type: "text", text: "Answer" },
357+
},
358+
});
359+
360+
const reader = createFileAcpEventLedger({ filePath });
361+
const replay = await reader.readReplay({
362+
sessionId: "session-2",
363+
sessionKey: "acp:gateway-session-2",
364+
});
365+
expect(replay.complete).toBe(true);
366+
expect(replay.sessionKey).toBe("acp:gateway-session-2");
367+
});
368+
});
369+
});

0 commit comments

Comments
 (0)