Skip to content

Commit 18e7d28

Browse files
committed
perf(gateway): reuse stable turn metadata
1 parent 02ca283 commit 18e7d28

10 files changed

Lines changed: 248 additions & 34 deletions

File tree

extensions/codex/src/app-server/startup-binding.test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,31 @@ describe("Codex app-server startup binding", () => {
7777
expect(savedBinding?.threadId).toBe("thread-existing");
7878
});
7979

80+
it("reuses the session record cache while sessions.json is unchanged", async () => {
81+
const sessionFile = path.join(tempDir, "session.jsonl");
82+
const workspaceDir = path.join(tempDir, "workspace");
83+
const agentDir = path.join(tempDir, "agent");
84+
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
85+
await writeSessionRecord(sessionFile, { totalTokens: 12_000 });
86+
const sessionsJson = path.join(path.dirname(sessionFile), "sessions.json");
87+
const readFileSpy = vi.spyOn(fs, "readFile");
88+
89+
for (let i = 0; i < 2; i += 1) {
90+
const binding = await rotateOversizedCodexAppServerStartupBinding({
91+
binding: await readCodexAppServerBinding(sessionFile),
92+
sessionFile,
93+
agentDir,
94+
config: undefined,
95+
});
96+
expect(binding?.threadId).toBe("thread-existing");
97+
}
98+
99+
const sessionStoreReads = readFileSpy.mock.calls.filter(
100+
([file]) => typeof file === "string" && file === sessionsJson,
101+
);
102+
expect(sessionStoreReads).toHaveLength(1);
103+
});
104+
80105
it("checks native rollout token pressure under default compaction config", async () => {
81106
const sessionFile = path.join(tempDir, "session.jsonl");
82107
const workspaceDir = path.join(tempDir, "workspace");

extensions/codex/src/app-server/startup-binding.ts

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ const CODEX_APP_SERVER_BYTE_UNITS: Record<string, number> = {
3030
tb: 1024 * 1024 * 1024 * 1024,
3131
tib: 1024 * 1024 * 1024 * 1024,
3232
};
33+
type CodexSessionRecordCacheEntry = {
34+
sessionsFile: string;
35+
mtimeMs: number;
36+
size: number;
37+
record: (Record<string, unknown> & { sessionKey: string }) | undefined;
38+
};
39+
40+
const codexSessionRecordCache = new Map<string, CodexSessionRecordCacheEntry>();
3341

3442
function parseCodexAppServerByteLimit(value: unknown): number | undefined {
3543
if (typeof value === "number" && Number.isFinite(value) && value > 0) {
@@ -112,26 +120,51 @@ async function readCodexSessionRecordForSessionFile(
112120
sessionFile: string,
113121
): Promise<(Record<string, unknown> & { sessionKey: string }) | undefined> {
114122
const sessionsFile = path.join(path.dirname(sessionFile), "sessions.json");
123+
const resolvedSessionFile = path.resolve(sessionFile);
124+
let stat: Awaited<ReturnType<typeof fs.stat>>;
125+
try {
126+
stat = await fs.stat(sessionsFile);
127+
} catch {
128+
codexSessionRecordCache.delete(resolvedSessionFile);
129+
return undefined;
130+
}
131+
const cached = codexSessionRecordCache.get(resolvedSessionFile);
132+
if (
133+
cached?.sessionsFile === sessionsFile &&
134+
cached.mtimeMs === stat.mtimeMs &&
135+
cached.size === stat.size
136+
) {
137+
return cached.record;
138+
}
115139
let store: JsonValue | undefined;
116140
try {
117141
store = JSON.parse(await fs.readFile(sessionsFile, "utf8")) as JsonValue;
118142
} catch {
143+
codexSessionRecordCache.delete(resolvedSessionFile);
119144
return undefined;
120145
}
121146
if (!isJsonObject(store)) {
147+
codexSessionRecordCache.delete(resolvedSessionFile);
122148
return undefined;
123149
}
124-
const resolvedSessionFile = path.resolve(sessionFile);
150+
let found: (Record<string, unknown> & { sessionKey: string }) | undefined;
125151
for (const [sessionKey, record] of Object.entries(store)) {
126152
if (!isJsonObject(record) || typeof record.sessionFile !== "string") {
127153
continue;
128154
}
129155
if (path.resolve(record.sessionFile) !== resolvedSessionFile) {
130156
continue;
131157
}
132-
return { sessionKey, ...record };
158+
found = { sessionKey, ...record };
159+
break;
133160
}
134-
return undefined;
161+
codexSessionRecordCache.set(resolvedSessionFile, {
162+
sessionsFile,
163+
mtimeMs: stat.mtimeMs,
164+
size: stat.size,
165+
record: found,
166+
});
167+
return found;
135168
}
136169

137170
type CodexAppServerRolloutTokenSnapshot = {

src/agents/command/attempt-execution.shared.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ export async function persistSessionEntry(
3838
store[params.sessionKey] = merged;
3939
return merged;
4040
},
41-
{ takeCacheOwnership: true },
41+
{
42+
resolveSingleEntryPersistence: (entry) =>
43+
entry ? { sessionKey: params.sessionKey, entry } : null,
44+
takeCacheOwnership: true,
45+
},
4246
);
4347
if (persisted) {
4448
params.sessionStore[params.sessionKey] = persisted;

src/auto-reply/reply/commands-session-store.ts

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@ export async function persistSessionEntry(params: CommandParams): Promise<boolea
1919
params.storePath,
2020
(store) => {
2121
store[params.sessionKey] = params.sessionEntry as SessionEntry;
22+
return params.sessionEntry as SessionEntry;
23+
},
24+
{
25+
resolveSingleEntryPersistence: (entry) =>
26+
entry ? { sessionKey: params.sessionKey, entry } : null,
27+
skipMaintenance: true,
2228
},
23-
{ skipMaintenance: true },
2429
);
2530
}
2631
return true;
@@ -44,16 +49,24 @@ export async function persistAbortTargetEntry(params: {
4449
sessionStore[key] = entry;
4550

4651
if (storePath) {
47-
await updateSessionStore(storePath, (store) => {
48-
const nextEntry = store[key] ?? entry;
49-
if (!nextEntry) {
50-
return;
51-
}
52-
nextEntry.abortedLastRun = true;
53-
applyAbortCutoffToSessionEntry(nextEntry, abortCutoff);
54-
nextEntry.updatedAt = Date.now();
55-
store[key] = nextEntry;
56-
});
52+
await updateSessionStore(
53+
storePath,
54+
(store) => {
55+
const nextEntry = store[key] ?? entry;
56+
if (!nextEntry) {
57+
return undefined;
58+
}
59+
nextEntry.abortedLastRun = true;
60+
applyAbortCutoffToSessionEntry(nextEntry, abortCutoff);
61+
nextEntry.updatedAt = Date.now();
62+
store[key] = nextEntry;
63+
return nextEntry;
64+
},
65+
{
66+
resolveSingleEntryPersistence: (updated) =>
67+
updated ? { sessionKey: key, entry: updated } : null,
68+
},
69+
);
5770
}
5871

5972
return true;

src/auto-reply/reply/session-updates.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,18 @@ async function persistSessionEntryUpdate(params: {
4242
if (!params.storePath) {
4343
return;
4444
}
45-
await updateSessionStore(params.storePath, (store) => {
46-
store[params.sessionKey!] = { ...store[params.sessionKey!], ...params.nextEntry };
47-
});
45+
await updateSessionStore(
46+
params.storePath,
47+
(store) => {
48+
const next = { ...store[params.sessionKey!], ...params.nextEntry };
49+
store[params.sessionKey!] = next;
50+
return next;
51+
},
52+
{
53+
resolveSingleEntryPersistence: (entry) =>
54+
entry && params.sessionKey ? { sessionKey: params.sessionKey, entry } : null,
55+
},
56+
);
4857
}
4958

5059
function emitCompactionSessionLifecycleHooks(params: {

src/config/sessions/sessions.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,49 @@ describe("session store writer queue", () => {
756756
writeSpy.mockRestore();
757757
});
758758

759+
it("can persist a known single entry without touching hydrated prompts from other sessions", async () => {
760+
const key = "agent:main:single-entry";
761+
const otherKey = "agent:main:other-entry";
762+
const otherPrompt = `<available_skills>\n${"other prompt\n".repeat(200)}</available_skills>`;
763+
const { dir, storePath } = await makeTmpStore({
764+
[key]: { sessionId: "s-single-entry", updatedAt: Date.now(), counter: 0 },
765+
[otherKey]: {
766+
sessionId: "s-other-entry",
767+
updatedAt: Date.now(),
768+
skillsSnapshot: {
769+
prompt: otherPrompt,
770+
skills: [{ name: "demo" }],
771+
version: 1,
772+
},
773+
},
774+
});
775+
loadSessionStore(storePath);
776+
await updateSessionStore(storePath, () => undefined, { skipMaintenance: true });
777+
const before = JSON.parse(fs.readFileSync(storePath, "utf8")) as Record<string, SessionEntry>;
778+
const beforeOtherEntry = before[otherKey];
779+
780+
await updateSessionStore(
781+
storePath,
782+
(store) => {
783+
const next = { ...store[key], counter: 1 } as SessionEntry;
784+
store[key] = next;
785+
return next;
786+
},
787+
{
788+
resolveSingleEntryPersistence: (entry) => ({ sessionKey: key, entry }),
789+
skipMaintenance: true,
790+
},
791+
);
792+
793+
const persisted = JSON.parse(fs.readFileSync(storePath, "utf8")) as Record<
794+
string,
795+
SessionEntry
796+
>;
797+
expect((persisted[key] as Record<string, unknown> | undefined)?.counter).toBe(1);
798+
expect(persisted[otherKey]).toStrictEqual(beforeOtherEntry);
799+
expect(fs.existsSync(path.join(dir, "skills-prompts"))).toBe(true);
800+
});
801+
759802
it("multiple consecutive errors do not permanently poison the queue", async () => {
760803
const key = "agent:main:multi-err";
761804
const { storePath } = await makeTmpStore({

src/config/sessions/store.ts

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ const writerStoreFileStats = new WeakMap<
8989
Record<string, SessionEntry>,
9090
ReturnType<typeof getFileStatSnapshot> | null
9191
>();
92+
let serializedPromptRefKeyCache:
93+
| {
94+
serialized: string;
95+
keys: Set<string>;
96+
}
97+
| undefined;
9298

9399
function loadSessionArchiveRuntime() {
94100
sessionArchiveRuntimePromise ??= import("../../gateway/session-archive.runtime.js");
@@ -365,12 +371,40 @@ function buildSingleEntrySerializedStore(params: {
365371
};
366372
}
367373

368-
function storeHasUntouchedHydratedSkillPrompts(
374+
function collectSerializedPromptRefKeys(serialized: string): Set<string> {
375+
if (serializedPromptRefKeyCache?.serialized === serialized) {
376+
return serializedPromptRefKeyCache.keys;
377+
}
378+
const keys = new Set<string>();
379+
try {
380+
const parsed = JSON.parse(serialized) as Record<string, SessionEntry>;
381+
for (const [key, entry] of Object.entries(parsed)) {
382+
if (entry?.skillsSnapshot?.promptRef) {
383+
keys.add(key);
384+
}
385+
}
386+
} catch {
387+
// Malformed serialized cache cannot prove prompt refs are already durable.
388+
}
389+
serializedPromptRefKeyCache = { serialized, keys };
390+
return keys;
391+
}
392+
393+
function storeHasUnsafeUntouchedHydratedSkillPrompts(
394+
storePath: string,
369395
store: Record<string, SessionEntry>,
370396
changedSessionKey: string,
371397
): boolean {
398+
const currentSerialized = getSerializedSessionStore(storePath);
399+
const serializedPromptRefKeys = currentSerialized
400+
? collectSerializedPromptRefKeys(currentSerialized)
401+
: undefined;
372402
for (const [key, entry] of Object.entries(store)) {
373-
if (key !== changedSessionKey && typeof entry.skillsSnapshot?.prompt === "string") {
403+
if (
404+
key !== changedSessionKey &&
405+
typeof entry.skillsSnapshot?.prompt === "string" &&
406+
!serializedPromptRefKeys?.has(key)
407+
) {
374408
return true;
375409
}
376410
}
@@ -620,7 +654,11 @@ async function saveSessionStoreUnlocked(
620654
if (
621655
opts?.singleEntryPersistence &&
622656
!maintenanceChangedStore &&
623-
!storeHasUntouchedHydratedSkillPrompts(store, opts.singleEntryPersistence.sessionKey)
657+
!storeHasUnsafeUntouchedHydratedSkillPrompts(
658+
storePath,
659+
store,
660+
opts.singleEntryPersistence.sessionKey,
661+
)
624662
) {
625663
const normalizedEntry = store[opts.singleEntryPersistence.sessionKey];
626664
const singleEntrySerialized = buildSingleEntrySerializedStore({

src/plugin-sdk/facade-loader.ts

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import fs from "node:fs";
2-
import { createRequire } from "node:module";
32
import path from "node:path";
43
import { fileURLToPath, pathToFileURL } from "node:url";
54
import { openRootFileSync } from "../infra/boundary-file-read.js";
@@ -14,22 +13,12 @@ import { resolveBundledFacadeModuleLocation } from "./facade-resolution-shared.j
1413

1514
const CURRENT_MODULE_PATH = fileURLToPath(import.meta.url);
1615

17-
const nodeRequire = createRequire(import.meta.url);
1816
const moduleLoaders: PluginModuleLoaderCache = new Map();
1917
const loadedFacadeModules = new Map<string, unknown>();
2018
const loadedFacadePluginIds = new Set<string>();
2119
let facadeLoaderSourceTransformFactory: PluginModuleLoaderFactory | undefined;
2220
let cachedOpenClawPackageRoot: string | undefined;
2321

24-
function getSourceTransformFactory() {
25-
if (facadeLoaderSourceTransformFactory) {
26-
return facadeLoaderSourceTransformFactory;
27-
}
28-
const { createJiti } = nodeRequire("jiti") as typeof import("jiti");
29-
facadeLoaderSourceTransformFactory = createJiti;
30-
return facadeLoaderSourceTransformFactory;
31-
}
32-
3322
function getOpenClawPackageRoot() {
3423
if (cachedOpenClawPackageRoot) {
3524
return cachedOpenClawPackageRoot;
@@ -63,7 +52,9 @@ function getModuleLoader(modulePath: string) {
6352
importerUrl: import.meta.url,
6453
preferBuiltDist: true,
6554
loaderFilename: import.meta.url,
66-
createLoader: getSourceTransformFactory(),
55+
...(facadeLoaderSourceTransformFactory
56+
? { createLoader: facadeLoaderSourceTransformFactory }
57+
: {}),
6758
});
6859
}
6960

src/plugins/plugin-metadata-snapshot.memo.test.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,30 @@ describe("loadPluginMetadataSnapshot process memo", () => {
244244
expect(second.byPluginId.get("demo")).toBe(second.plugins[0]);
245245
});
246246

247+
it("skips persisted registry filesystem fingerprints after a process memo hit", () => {
248+
const stateDir = tempStateDir();
249+
touchPersistedIndex(stateDir);
250+
loadPluginRegistrySnapshotWithMetadata.mockReturnValue({
251+
source: "persisted",
252+
snapshot: makeIndex(),
253+
diagnostics: [],
254+
});
255+
256+
const first = loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir });
257+
const statSpy = vi.spyOn(fs, "statSync");
258+
const readdirSpy = vi.spyOn(fs, "readdirSync");
259+
try {
260+
const second = loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir });
261+
262+
expect(second).toBe(first);
263+
expect(statSpy).not.toHaveBeenCalled();
264+
expect(readdirSpy).not.toHaveBeenCalled();
265+
} finally {
266+
statSpy.mockRestore();
267+
readdirSpy.mockRestore();
268+
}
269+
});
270+
247271
it("clears the process memo at plugin metadata lifecycle boundaries", () => {
248272
const stateDir = tempStateDir();
249273
touchPersistedIndex(stateDir);
@@ -481,7 +505,7 @@ describe("loadPluginMetadataSnapshot process memo", () => {
481505
expect(loadPluginRegistrySnapshotWithMetadata).toHaveBeenCalledOnce();
482506
});
483507

484-
it("refreshes when the persisted registry file changes", () => {
508+
it("keeps persisted registry snapshots process-stable until lifecycle clear", () => {
485509
const stateDir = tempStateDir();
486510
touchPersistedIndex(stateDir, 1);
487511
loadPluginRegistrySnapshotWithMetadata.mockReturnValue({
@@ -494,6 +518,11 @@ describe("loadPluginMetadataSnapshot process memo", () => {
494518
touchPersistedIndex(stateDir, 22);
495519
loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir });
496520

521+
expect(loadPluginRegistrySnapshotWithMetadata).toHaveBeenCalledOnce();
522+
523+
clearPluginMetadataLifecycleCaches();
524+
loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir });
525+
497526
expect(loadPluginRegistrySnapshotWithMetadata).toHaveBeenCalledTimes(2);
498527
});
499528

0 commit comments

Comments
 (0)