Skip to content

Commit db40fde

Browse files
authored
fix: persist ACP metadata in SQLite (#88724)
* fix: persist acp metadata in sqlite * test: align session store acp expectations
1 parent cdff174 commit db40fde

38 files changed

Lines changed: 2226 additions & 707 deletions

docs/refactor/database-first.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,8 @@ The branch already has a real shared SQLite base:
311311
`delivery_queue_entries`, `model_capability_cache`,
312312
`workspace_setup_state`, `native_hook_relay_bridges`,
313313
`current_conversation_bindings`, `plugin_binding_approvals`,
314-
`tui_last_sessions`, `task_runs`, `task_delivery_state`, `flow_runs`,
314+
`tui_last_sessions`, `acp_sessions`, `acp_replay_sessions`,
315+
`acp_replay_events`, `task_runs`, `task_delivery_state`, `flow_runs`,
315316
`subagent_runs`, `migration_runs`, and `backup_runs`.
316317
- Arbitrary plugin-owned state does not get host-owned typed tables. Installed
317318
plugins use `plugin_state_entries` for versioned JSON payloads and
@@ -1669,6 +1670,8 @@ Move these into agent databases:
16691670
- ACP replay ledger sessions. Done for runtime writes via
16701671
`acp_replay_sessions` and `acp_replay_events`; legacy `acp/event-ledger.json`
16711672
remains only as doctor input.
1673+
- ACP session metadata. Done for runtime writes via `acp_sessions`; legacy
1674+
`entry.acp` blocks in `sessions.json` are doctor migration input only.
16721675
- Trajectory sidecars when they are not explicit export files. Done for runtime
16731676
writes: trajectory capture writes agent-database `trajectory_runtime_events`
16741677
rows and mirrors run-scoped artifacts into SQLite. Legacy sidecars are doctor

extensions/feishu/src/doctor.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,6 @@ async function repairFeishuDoctorState(params: {
752752
},
753753
{
754754
skipMaintenance: true,
755-
allowDropAcpMetaSessionKeys: [...keys],
756755
},
757756
);
758757
const removed = removedEntries.length;

extensions/telegram/src/thread-bindings.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ export function createTelegramThreadBindingManager(params: {
560560
sessionEntry.entry.status === "failed" ||
561561
sessionEntry.entry.status === "killed" ||
562562
sessionEntry.entry.status === "timeout" ||
563-
sessionEntry.entry.acp?.state === "error";
563+
sessionEntry.acp?.state === "error";
564564
if (isStale) {
565565
staleSessionKeys.add(targetSessionKey);
566566
}

src/acp/control-plane/manager.core.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1473,7 +1473,7 @@ export class AcpSessionManager {
14731473
if (!entry) {
14741474
return null;
14751475
}
1476-
const base = current ?? entry.acp;
1476+
const base = current;
14771477
if (!base) {
14781478
return null;
14791479
}
@@ -1619,7 +1619,7 @@ export class AcpSessionManager {
16191619
if (!entry) {
16201620
return null;
16211621
}
1622-
const base = current ?? entry.acp;
1622+
const base = current;
16231623
if (!base) {
16241624
return null;
16251625
}
@@ -1668,7 +1668,7 @@ export class AcpSessionManager {
16681668
if (!entry) {
16691669
return null;
16701670
}
1671-
const base = current ?? entry.acp;
1671+
const base = current;
16721672
if (!base) {
16731673
return null;
16741674
}
@@ -1779,7 +1779,7 @@ export class AcpSessionManager {
17791779
if (!entry) {
17801780
return null;
17811781
}
1782-
const base = current ?? entry.acp;
1782+
const base = current;
17831783
if (!base) {
17841784
return null;
17851785
}

src/acp/control-plane/manager.identity-reconcile.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ export async function reconcileManagerRuntimeSessionIdentifiers(params: {
148148
if (!entry) {
149149
return null;
150150
}
151-
const base = current ?? entry.acp;
151+
const base = current;
152152
if (!base) {
153153
return null;
154154
}

src/acp/event-ledger.test.ts

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
import fs from "node:fs/promises";
22
import path from "node:path";
3-
import { describe, expect, it } from "vitest";
3+
import { afterEach, describe, expect, it } from "vitest";
4+
import { closeOpenClawStateDatabaseForTest } from "../state/openclaw-state-db.js";
45
import { withTempDir } from "../test-helpers/temp-dir.js";
5-
import { createFileAcpEventLedger, createInMemoryAcpEventLedger } from "./event-ledger.js";
6+
import {
7+
createFileAcpEventLedger,
8+
createInMemoryAcpEventLedger,
9+
createSqliteAcpEventLedger,
10+
migrateFileAcpEventLedgerToSqlite,
11+
} from "./event-ledger.js";
612

713
describe("ACP event ledger", () => {
14+
afterEach(() => {
15+
closeOpenClawStateDatabaseForTest();
16+
});
17+
818
it("records complete in-memory session updates in sequence", async () => {
919
const ledger = createInMemoryAcpEventLedger({ now: () => 123 });
1020
await ledger.startSession({
@@ -142,6 +152,123 @@ describe("ACP event ledger", () => {
142152
});
143153
});
144154

155+
it("persists SQLite-backed replay state across ledger instances", async () => {
156+
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
157+
const databasePath = path.join(dir, "openclaw.sqlite");
158+
const first = createSqliteAcpEventLedger({ path: databasePath, now: () => 1000 });
159+
await first.startSession({
160+
sessionId: "session-1",
161+
sessionKey: "agent:main:work",
162+
cwd: "/work",
163+
complete: true,
164+
});
165+
await first.recordUpdate({
166+
sessionId: "session-1",
167+
sessionKey: "agent:main:work",
168+
runId: "run-1",
169+
update: {
170+
sessionUpdate: "agent_thought_chunk",
171+
content: { type: "text", text: "Thinking" },
172+
},
173+
});
174+
175+
closeOpenClawStateDatabaseForTest();
176+
const second = createSqliteAcpEventLedger({ path: databasePath });
177+
const replay = await second.readReplay({
178+
sessionId: "session-1",
179+
sessionKey: "agent:main:work",
180+
});
181+
182+
expect(replay.complete).toBe(true);
183+
expect(replay.events).toHaveLength(1);
184+
expect(replay.events[0]?.update).toEqual({
185+
sessionUpdate: "agent_thought_chunk",
186+
content: { type: "text", text: "Thinking" },
187+
});
188+
});
189+
});
190+
191+
it("imports legacy file-backed replay state into SQLite", async () => {
192+
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
193+
const filePath = path.join(dir, "acp", "event-ledger.json");
194+
const databasePath = path.join(dir, "openclaw.sqlite");
195+
const legacy = createFileAcpEventLedger({ filePath, now: () => 1000 });
196+
await legacy.startSession({
197+
sessionId: "session-1",
198+
sessionKey: "agent:main:work",
199+
cwd: "/work",
200+
complete: true,
201+
});
202+
await legacy.recordUpdate({
203+
sessionId: "session-1",
204+
sessionKey: "agent:main:work",
205+
runId: "run-1",
206+
update: {
207+
sessionUpdate: "agent_message_chunk",
208+
content: { type: "text", text: "Answer" },
209+
},
210+
});
211+
212+
const migrated = await migrateFileAcpEventLedgerToSqlite({
213+
filePath,
214+
path: databasePath,
215+
archiveSource: true,
216+
});
217+
const sqlite = createSqliteAcpEventLedger({ path: databasePath });
218+
const replay = await sqlite.readReplay({
219+
sessionId: "session-1",
220+
sessionKey: "agent:main:work",
221+
});
222+
223+
expect(migrated).toEqual({
224+
importedSessions: 1,
225+
importedEvents: 1,
226+
archived: true,
227+
});
228+
expect(replay.complete).toBe(true);
229+
expect(replay.events[0]?.update).toEqual({
230+
sessionUpdate: "agent_message_chunk",
231+
content: { type: "text", text: "Answer" },
232+
});
233+
await expect(fs.stat(`${filePath}.migrated`)).resolves.toBeTruthy();
234+
});
235+
});
236+
237+
it("marks SQLite-backed replay incomplete when event retention truncates history", async () => {
238+
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
239+
const ledger = createSqliteAcpEventLedger({
240+
path: path.join(dir, "openclaw.sqlite"),
241+
maxEventsPerSession: 1,
242+
});
243+
await ledger.startSession({
244+
sessionId: "session-1",
245+
sessionKey: "agent:main:work",
246+
cwd: "/work",
247+
complete: true,
248+
});
249+
await ledger.recordUpdate({
250+
sessionId: "session-1",
251+
sessionKey: "agent:main:work",
252+
update: {
253+
sessionUpdate: "agent_message_chunk",
254+
content: { type: "text", text: "First" },
255+
},
256+
});
257+
await ledger.recordUpdate({
258+
sessionId: "session-1",
259+
sessionKey: "agent:main:work",
260+
update: {
261+
sessionUpdate: "agent_message_chunk",
262+
content: { type: "text", text: "Second" },
263+
},
264+
});
265+
266+
await expect(
267+
ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }),
268+
).resolves.toEqual({ complete: false, events: [] });
269+
});
270+
});
271+
145272
it("can replay a complete session by Gateway session key", async () => {
146273
const ledger = createInMemoryAcpEventLedger({ now: () => 1000 });
147274
await ledger.startSession({

0 commit comments

Comments
 (0)