Skip to content

Commit c4a0ca0

Browse files
committed
perf(agents): cache subagent registry reads
1 parent dfb4491 commit c4a0ca0

4 files changed

Lines changed: 138 additions & 1 deletion

File tree

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Subagent registry state tests cover hot read caching over the persisted SQLite snapshot.
2+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
3+
import {
4+
clearSubagentRunsReadCacheForTest,
5+
getSubagentRunsSnapshotForRead,
6+
persistSubagentRunsToDisk,
7+
} from "./subagent-registry-state.js";
8+
import type { SubagentRunRecord } from "./subagent-registry.types.js";
9+
10+
const mocks = vi.hoisted(() => ({
11+
loadSubagentRegistryFromSqlite: vi.fn<() => Map<string, SubagentRunRecord>>(),
12+
saveSubagentRegistryToSqlite: vi.fn<(runs: Map<string, SubagentRunRecord>) => void>(),
13+
}));
14+
15+
vi.mock("./subagent-registry.store.sqlite.js", () => ({
16+
loadSubagentRegistryFromSqlite: mocks.loadSubagentRegistryFromSqlite,
17+
saveSubagentRegistryToSqlite: mocks.saveSubagentRegistryToSqlite,
18+
}));
19+
20+
function createRun(runId: string): SubagentRunRecord {
21+
return {
22+
runId,
23+
childSessionKey: `agent:main:subagent:${runId}`,
24+
requesterSessionKey: "agent:main:main",
25+
requesterDisplayKey: "main",
26+
task: `task ${runId}`,
27+
cleanup: "keep",
28+
createdAt: 1,
29+
startedAt: 1,
30+
};
31+
}
32+
33+
describe("subagent registry state read cache", () => {
34+
const previousReadDiskFlag = process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK;
35+
36+
beforeEach(() => {
37+
vi.useFakeTimers();
38+
vi.setSystemTime(1_000);
39+
process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK = "1";
40+
clearSubagentRunsReadCacheForTest();
41+
mocks.loadSubagentRegistryFromSqlite.mockReset();
42+
mocks.saveSubagentRegistryToSqlite.mockReset();
43+
});
44+
45+
afterEach(() => {
46+
clearSubagentRunsReadCacheForTest();
47+
if (previousReadDiskFlag === undefined) {
48+
delete process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK;
49+
} else {
50+
process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK = previousReadDiskFlag;
51+
}
52+
vi.useRealTimers();
53+
});
54+
55+
it("reuses persisted snapshots for hot reads within the ttl", () => {
56+
const firstRun = createRun("run-first");
57+
const secondRun = createRun("run-second");
58+
mocks.loadSubagentRegistryFromSqlite
59+
.mockReturnValueOnce(new Map([[firstRun.runId, firstRun]]))
60+
.mockReturnValueOnce(new Map([[secondRun.runId, secondRun]]));
61+
62+
expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-first"]);
63+
expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-first"]);
64+
expect(mocks.loadSubagentRegistryFromSqlite).toHaveBeenCalledTimes(1);
65+
66+
vi.advanceTimersByTime(500);
67+
68+
expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-second"]);
69+
expect(mocks.loadSubagentRegistryFromSqlite).toHaveBeenCalledTimes(2);
70+
});
71+
72+
it("refreshes the local read cache after successful writes", () => {
73+
const firstRun = createRun("run-first");
74+
const savedRun = createRun("run-saved");
75+
mocks.loadSubagentRegistryFromSqlite.mockReturnValue(new Map([[firstRun.runId, firstRun]]));
76+
77+
expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-first"]);
78+
79+
persistSubagentRunsToDisk(new Map([[savedRun.runId, savedRun]]));
80+
81+
expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-saved"]);
82+
expect(mocks.saveSubagentRegistryToSqlite).toHaveBeenCalledOnce();
83+
expect(mocks.loadSubagentRegistryFromSqlite).toHaveBeenCalledTimes(1);
84+
});
85+
});

src/agents/subagent-registry-state.ts

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,62 @@ import {
99
} from "./subagent-registry.store.sqlite.js";
1010
import type { SubagentRunRecord } from "./subagent-registry.types.js";
1111

12+
const SUBAGENT_RUNS_READ_CACHE_TTL_MS = 500;
13+
14+
let persistedSubagentRunsReadCache:
15+
| {
16+
loadedAtMs: number;
17+
runs: Map<string, SubagentRunRecord>;
18+
}
19+
| undefined;
20+
21+
function cloneSubagentRunsSnapshot(
22+
runs: Map<string, SubagentRunRecord>,
23+
): Map<string, SubagentRunRecord> {
24+
return new Map([...runs.entries()].map(([runId, entry]) => [runId, structuredClone(entry)]));
25+
}
26+
27+
function rememberPersistedSubagentRunsSnapshot(runs: Map<string, SubagentRunRecord>): void {
28+
persistedSubagentRunsReadCache = {
29+
loadedAtMs: Date.now(),
30+
runs: cloneSubagentRunsSnapshot(runs),
31+
};
32+
}
33+
34+
function loadPersistedSubagentRunsForRead(): Map<string, SubagentRunRecord> {
35+
const nowMs = Date.now();
36+
if (
37+
persistedSubagentRunsReadCache &&
38+
nowMs >= persistedSubagentRunsReadCache.loadedAtMs &&
39+
nowMs - persistedSubagentRunsReadCache.loadedAtMs < SUBAGENT_RUNS_READ_CACHE_TTL_MS
40+
) {
41+
return persistedSubagentRunsReadCache.runs;
42+
}
43+
44+
const runs = loadSubagentRegistryFromSqlite();
45+
persistedSubagentRunsReadCache = {
46+
loadedAtMs: nowMs,
47+
runs,
48+
};
49+
return runs;
50+
}
51+
52+
export function clearSubagentRunsReadCacheForTest(): void {
53+
persistedSubagentRunsReadCache = undefined;
54+
}
55+
1256
export function persistSubagentRunsToDisk(runs: Map<string, SubagentRunRecord>) {
1357
try {
1458
saveSubagentRegistryToSqlite(runs);
59+
rememberPersistedSubagentRunsSnapshot(runs);
1560
} catch {
1661
// ignore persistence failures
1762
}
1863
}
1964

2065
export function persistSubagentRunsToDiskOrThrow(runs: Map<string, SubagentRunRecord>) {
2166
saveSubagentRegistryToSqlite(runs);
67+
rememberPersistedSubagentRunsSnapshot(runs);
2268
}
2369

2470
export function restoreSubagentRunsFromDisk(params: {
@@ -53,7 +99,9 @@ export function getSubagentRunsSnapshotForRead(
5399
if (shouldReadDisk) {
54100
try {
55101
// Persisted state lets other worker processes observe active runs.
56-
for (const [runId, entry] of loadSubagentRegistryFromSqlite().entries()) {
102+
// Cache this hot cross-process snapshot briefly; writes refresh the local
103+
// cache and the TTL bounds visibility of changes from other processes.
104+
for (const [runId, entry] of loadPersistedSubagentRunsForRead().entries()) {
57105
merged.set(runId, entry);
58106
}
59107
} catch {

src/agents/subagent-registry.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ const mocks = vi.hoisted(() => ({
8787
resolveStorePath: vi.fn(() => "/tmp/test-session-store.json"),
8888
updateSessionStore: vi.fn(),
8989
emitSessionLifecycleEvent: vi.fn(),
90+
clearSubagentRunsReadCacheForTest: vi.fn(),
9091
persistSubagentRunsToDisk: vi.fn(),
9192
persistSubagentRunsToDiskOrThrow: vi.fn(),
9293
restoreSubagentRunsFromDisk: vi.fn(() => 0),
@@ -133,6 +134,7 @@ vi.mock("../sessions/session-lifecycle-events.js", () => ({
133134
}));
134135

135136
vi.mock("./subagent-registry-state.js", () => ({
137+
clearSubagentRunsReadCacheForTest: mocks.clearSubagentRunsReadCacheForTest,
136138
getSubagentRunsSnapshotForRead: mocks.getSubagentRunsSnapshotForRead,
137139
persistSubagentRunsToDisk: mocks.persistSubagentRunsToDisk,
138140
persistSubagentRunsToDiskOrThrow: mocks.persistSubagentRunsToDiskOrThrow,

src/agents/subagent-registry.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ import {
7474
type RegisterSubagentRunParams,
7575
} from "./subagent-registry-run-manager.js";
7676
import {
77+
clearSubagentRunsReadCacheForTest,
7778
getSubagentRunsSnapshotForRead,
7879
persistSubagentRunsToDisk,
7980
persistSubagentRunsToDiskOrThrow,
@@ -1255,6 +1256,7 @@ export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
12551256
runtimePluginsLoader.clear();
12561257
subagentAnnounceLoader.clear();
12571258
browserCleanupLoader.clear();
1259+
clearSubagentRunsReadCacheForTest();
12581260
stopSweeper();
12591261
sweepInProgress = false;
12601262
restoreAttempted = false;

0 commit comments

Comments
 (0)