Skip to content

Commit b443704

Browse files
committed
perf: route session store writes through writer queue
1 parent ffc7953 commit b443704

32 files changed

Lines changed: 497 additions & 395 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai
3838
- Models CLI: restore `openclaw models list --provider <id>` catalog and registry fallback rows for unconfigured providers, so provider-specific verification commands no longer report "No models found." Fixes #75517; supersedes #75615. Thanks @lotsoftick and @koshaji.
3939
- Gateway/macOS: write LaunchAgent services with a canonical system PATH and stop preserving old plist PATH entries, so Volta, asdf, fnm, and pnpm shell paths no longer affect gateway child-process Node resolution. Fixes #75233; supersedes #75246. Thanks @nphyde2.
4040
- Slack/hooks: preserve bot alert attachment text in message-received hook content when command text is blank. Fixes #76035; refs #76036. Thanks @amsminn.
41+
- Sessions: route Gateway session-store writes through a dedicated in-process writer and borrow the validated mutable cache during the writer slot, avoiding runtime file locks plus repeated `sessions.json` rereads and JSON clones on hot metadata updates. Refs #68554. Thanks @henkterharmsel.
4142
- Control UI/chat: show inline feedback when local slash-command dispatch is unavailable or fails unexpectedly instead of clearing the composer silently. Fixes #52105. Thanks @MooreQiao.
4243
- Memory/markdown: replace CRLF managed blocks in place and collapse duplicate marker blocks without rewriting unmanaged markdown, so Dreaming and Memory Wiki files self-heal from repeated generated sections. Fixes #75491; supersedes #75495, #75810, and #76008. Thanks @asaenokkostya-coder, @ottodeng, @everettjf, and @lrg913427-dot.
4344
- Agents/tools: return critical tool-loop circuit-breaker stops as blocked tool results instead of thrown tool failures, so models see the guardrail and stop retrying the same call. Thanks @rayraiser.

docs/plugins/sdk-runtime.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,16 @@ Provider and channel execution paths must use the active runtime config snapshot
114114

115115
```typescript
116116
const storePath = api.runtime.agent.session.resolveStorePath(cfg);
117-
const store = api.runtime.agent.session.loadSessionStore(cfg);
118-
await api.runtime.agent.session.saveSessionStore(cfg, store);
117+
const store = api.runtime.agent.session.loadSessionStore(storePath);
118+
await api.runtime.agent.session.updateSessionStore(storePath, (nextStore) => {
119+
// Patch one entry without replacing the whole file from stale state.
120+
nextStore[sessionKey] = { ...nextStore[sessionKey], thinkingLevel: "high" };
121+
});
119122
const filePath = api.runtime.agent.session.resolveSessionFilePath(cfg, sessionId);
120123
```
121124

125+
Prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)` for runtime writes. They route through the Gateway-owned session-store writer, preserve concurrent updates, and reuse the hot cache. `saveSessionStore(...)` remains available for compatibility and offline maintenance-style rewrites.
126+
122127
</Accordion>
123128
<Accordion title="api.runtime.agent.defaults">
124129
Default model and provider constants:

docs/reference/session-management-compaction.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f
8585
- `maxDiskBytes`: optional sessions-directory budget
8686
- `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`)
8787

88-
Normal Gateway writes batch `maxEntries` cleanup for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately.
88+
Normal Gateway writes flow through a per-store session writer that serializes in-process mutations without taking a runtime file lock. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately.
8989

9090
Maintenance keeps durable external conversation pointers such as group sessions
9191
and thread-scoped chat sessions, but synthetic runtime entries for cron, hooks,

extensions/voice-call/src/response-generator.test.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,14 @@ import { generateVoiceResponse } from "./response-generator.js";
66
function createAgentRuntime(payloads: Array<Record<string, unknown>>) {
77
const sessionStore: Record<string, { sessionId: string; updatedAt: number }> = {};
88
const saveSessionStore = vi.fn(async () => {});
9+
const updateSessionStore = vi.fn(
10+
async (
11+
_storePath: string,
12+
mutator: (store: Record<string, { sessionId: string; updatedAt: number }>) => unknown,
13+
) => {
14+
return await mutator(sessionStore);
15+
},
16+
);
917
const runEmbeddedPiAgent = vi.fn(async () => ({
1018
payloads,
1119
meta: { durationMs: 12, aborted: false },
@@ -44,6 +52,7 @@ function createAgentRuntime(payloads: Array<Record<string, unknown>>) {
4452
resolveStorePath,
4553
loadSessionStore: () => sessionStore,
4654
saveSessionStore,
55+
updateSessionStore,
4756
resolveSessionFilePath,
4857
},
4958
} as unknown as CoreAgentDeps;
@@ -52,6 +61,7 @@ function createAgentRuntime(payloads: Array<Record<string, unknown>>) {
5261
runtime,
5362
runEmbeddedPiAgent,
5463
saveSessionStore,
64+
updateSessionStore,
5565
sessionStore,
5666
resolveAgentDir,
5767
resolveAgentWorkspaceDir,
@@ -157,7 +167,7 @@ describe("generateVoiceResponse", () => {
157167
});
158168

159169
it("pins the voice session to responseModel before running the embedded agent", async () => {
160-
const { runtime, runEmbeddedPiAgent, saveSessionStore, sessionStore } = createAgentRuntime([
170+
const { runtime, runEmbeddedPiAgent, updateSessionStore, sessionStore } = createAgentRuntime([
161171
{ text: '{"spoken":"Pinned model works."}' },
162172
]);
163173
const voiceConfig = VoiceCallConfigSchema.parse({
@@ -181,7 +191,10 @@ describe("generateVoiceResponse", () => {
181191
modelOverride: "gpt-4.1-nano",
182192
modelOverrideSource: "auto",
183193
});
184-
expect(saveSessionStore).toHaveBeenCalledWith("/tmp/openclaw/main/sessions.json", sessionStore);
194+
expect(updateSessionStore).toHaveBeenCalledWith(
195+
"/tmp/openclaw/main/sessions.json",
196+
expect.any(Function),
197+
);
185198
expect(runEmbeddedPiAgent).toHaveBeenCalledWith(
186199
expect.objectContaining({
187200
provider: "openai",

extensions/voice-call/src/response-generator.ts

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -224,34 +224,34 @@ export async function generateVoiceResponse(
224224
// Load or create session entry
225225
const sessionStore = agentRuntime.session.loadSessionStore(storePath);
226226
const now = Date.now();
227-
let sessionEntry = sessionStore[resolvedSessionKey] as SessionEntry | undefined;
228-
let sessionEntryUpdated = false;
229-
230-
if (!sessionEntry) {
231-
sessionEntry = {
232-
sessionId: crypto.randomUUID(),
233-
updatedAt: now,
234-
};
235-
sessionStore[resolvedSessionKey] = sessionEntry;
236-
sessionEntryUpdated = true;
237-
}
238-
239-
const sessionId = sessionEntry.sessionId;
227+
const existingSessionEntry = sessionStore[resolvedSessionKey] as SessionEntry | undefined;
240228

241229
// Resolve model from config
242230
const { provider, model } = resolveVoiceResponseModel({ voiceConfig, agentRuntime });
243-
if (voiceConfig.responseModel) {
244-
sessionEntryUpdated =
245-
applyModelOverrideToSessionEntry({
246-
entry: sessionEntry,
247-
selection: { provider, model },
248-
selectionSource: "auto",
249-
}).updated || sessionEntryUpdated;
250-
}
251231

252-
if (sessionEntryUpdated) {
253-
await agentRuntime.session.saveSessionStore(storePath, sessionStore);
232+
let sessionEntry = existingSessionEntry;
233+
if (!sessionEntry?.sessionId || voiceConfig.responseModel) {
234+
sessionEntry = await agentRuntime.session.updateSessionStore(storePath, (store) => {
235+
let entry = store[resolvedSessionKey] as SessionEntry | undefined;
236+
if (!entry?.sessionId) {
237+
entry = {
238+
...entry,
239+
sessionId: crypto.randomUUID(),
240+
updatedAt: now,
241+
};
242+
store[resolvedSessionKey] = entry;
243+
}
244+
if (voiceConfig.responseModel) {
245+
applyModelOverrideToSessionEntry({
246+
entry,
247+
selection: { provider, model },
248+
selectionSource: "auto",
249+
});
250+
}
251+
return entry;
252+
});
254253
}
254+
const sessionId = sessionEntry.sessionId;
255255

256256
const sessionFile = agentRuntime.session.resolveSessionFilePath(sessionId, sessionEntry, {
257257
agentId,

extensions/voice-call/src/runtime.test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ describe("createVoiceCallRuntime lifecycle", () => {
337337
resolveStorePath: vi.fn(() => "/tmp/sessions.json"),
338338
loadSessionStore: vi.fn(() => sessionStore),
339339
saveSessionStore: vi.fn(async () => {}),
340+
updateSessionStore: vi.fn(async (_storePath, mutator) => mutator(sessionStore as never)),
340341
resolveSessionFilePath: vi.fn(() => "/tmp/session.json"),
341342
},
342343
runEmbeddedPiAgent,
@@ -421,6 +422,7 @@ describe("createVoiceCallRuntime lifecycle", () => {
421422
resolveStorePath: vi.fn(() => "/tmp/sessions.json"),
422423
loadSessionStore: vi.fn(() => sessionStore),
423424
saveSessionStore: vi.fn(async () => {}),
425+
updateSessionStore: vi.fn(async (_storePath, mutator) => mutator(sessionStore as never)),
424426
resolveSessionFilePath: vi.fn(() => "/tmp/session.json"),
425427
},
426428
runEmbeddedPiAgent,
@@ -483,6 +485,7 @@ describe("createVoiceCallRuntime lifecycle", () => {
483485
resolveStorePath: vi.fn(() => "/tmp/sessions.json"),
484486
loadSessionStore: vi.fn(() => sessionStore),
485487
saveSessionStore: vi.fn(async () => {}),
488+
updateSessionStore: vi.fn(async (_storePath, mutator) => mutator(sessionStore as never)),
486489
resolveSessionFilePath: vi.fn(() => "/tmp/session.json"),
487490
},
488491
runEmbeddedPiAgent,

src/agents/pi-embedded-subscribe.handlers.compaction.test.ts

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
import fs from "node:fs/promises";
22
import os from "node:os";
33
import path from "node:path";
4-
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
5-
import {
6-
drainSessionStoreLockQueuesForTest,
7-
resetSessionStoreLockRuntimeForTests,
8-
setSessionWriteLockAcquirerForTests,
9-
} from "../config/sessions.js";
4+
import { afterEach, describe, expect, it, vi } from "vitest";
5+
import { drainSessionStoreWriterQueuesForTest } from "../config/sessions.js";
106
import {
117
readCompactionCount,
128
seedSessionStore,
@@ -57,15 +53,8 @@ function createCompactionContext(params: {
5753
} as unknown as EmbeddedPiSubscribeContext;
5854
}
5955

60-
beforeEach(() => {
61-
setSessionWriteLockAcquirerForTests(async () => ({
62-
release: async () => {},
63-
}));
64-
});
65-
6656
afterEach(async () => {
67-
resetSessionStoreLockRuntimeForTests();
68-
await drainSessionStoreLockQueuesForTest();
57+
await drainSessionStoreWriterQueuesForTest();
6958
});
7059

7160
describe("reconcileSessionStoreCompactionCountAfterSuccess", () => {

src/agents/subagent-registry.persistence.resume.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vite
55
import "./subagent-registry.mocks.shared.js";
66
import {
77
clearSessionStoreCacheForTest,
8-
drainSessionStoreLockQueuesForTest,
8+
drainSessionStoreWriterQueuesForTest,
99
} from "../config/sessions/store.js";
1010
import { captureEnv } from "../test-utils/env.js";
1111
import {
@@ -131,7 +131,7 @@ describe("subagent registry persistence resume", () => {
131131
announceSpy.mockClear();
132132
mod.__testing.setDepsForTest();
133133
mod.resetSubagentRegistryForTests({ persist: false });
134-
await drainSessionStoreLockQueuesForTest();
134+
await drainSessionStoreWriterQueuesForTest();
135135
clearSessionStoreCacheForTest();
136136
if (tempStateDir) {
137137
await fs.rm(tempStateDir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 });

src/agents/subagent-registry.persistence.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
66
import "./subagent-registry.mocks.shared.js";
77
import {
88
clearSessionStoreCacheForTest,
9-
drainSessionStoreLockQueuesForTest,
9+
drainSessionStoreWriterQueuesForTest,
1010
} from "../config/sessions/store.js";
1111
import { callGateway } from "../gateway/call.js";
1212
import { onAgentEvent } from "../infra/agent-events.js";
@@ -207,7 +207,7 @@ describe("subagent registry persistence", () => {
207207
announceSpy.mockClear();
208208
__testing.setDepsForTest();
209209
resetSubagentRegistryForTests({ persist: false });
210-
await drainSessionStoreLockQueuesForTest();
210+
await drainSessionStoreWriterQueuesForTest();
211211
clearSessionStoreCacheForTest();
212212
if (tempStateDir) {
213213
await fs.rm(tempStateDir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 });

src/config/sessions.test.ts

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import fsSync from "node:fs";
12
import fs from "node:fs/promises";
23
import os from "node:os";
34
import path from "node:path";
4-
import { afterAll, beforeAll, describe, expect, it } from "vitest";
5+
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
56
import { withEnv } from "../test-utils/env.js";
67
import {
78
buildGroupDisplayName,
@@ -798,7 +799,7 @@ describe("sessions", () => {
798799
await expect(fs.stat(`${storePath}.lock`)).rejects.toThrow();
799800
});
800801

801-
it("updateSessionStoreEntry re-reads disk inside lock instead of using stale cache", async () => {
802+
it("updateSessionStoreEntry re-reads disk inside the writer slot instead of using stale cache", async () => {
802803
const mainSessionKey = "agent:main:main";
803804
const { storePath } = await createSessionStoreFixture({
804805
prefix: "updateSessionStoreEntry-cache-bypass",
@@ -838,4 +839,91 @@ describe("sessions", () => {
838839
expect(store[mainSessionKey]?.providerOverride).toBe("anthropic");
839840
expect(store[mainSessionKey]?.thinkingLevel).toBe("high");
840841
});
842+
843+
it("updateSessionStore uses the writer-owned mutable cache without disk read or parse", async () => {
844+
const mainSessionKey = "agent:main:main";
845+
const { storePath } = await createSessionStoreFixture({
846+
prefix: "updateSessionStore-mutable-cache",
847+
entries: {
848+
[mainSessionKey]: {
849+
sessionId: "sess-1",
850+
updatedAt: 123,
851+
thinkingLevel: "low",
852+
},
853+
},
854+
});
855+
856+
expect(loadSessionStore(storePath)[mainSessionKey]?.thinkingLevel).toBe("low");
857+
858+
const readSpy = vi.spyOn(fsSync, "readFileSync");
859+
const parseSpy = vi.spyOn(JSON, "parse");
860+
try {
861+
await updateSessionStore(
862+
storePath,
863+
(store) => {
864+
const existing = store[mainSessionKey];
865+
if (!existing) {
866+
throw new Error("missing session entry");
867+
}
868+
store[mainSessionKey] = {
869+
...existing,
870+
thinkingLevel: "high",
871+
};
872+
},
873+
{ skipMaintenance: true },
874+
);
875+
876+
expect(readSpy).not.toHaveBeenCalled();
877+
expect(parseSpy).not.toHaveBeenCalled();
878+
} finally {
879+
readSpy.mockRestore();
880+
parseSpy.mockRestore();
881+
}
882+
883+
const store = loadSessionStore(storePath, { skipCache: true });
884+
expect(store[mainSessionKey]?.thinkingLevel).toBe("high");
885+
});
886+
887+
it("updateSessionStore drops a borrowed cache entry when a mutator throws", async () => {
888+
const mainSessionKey = "agent:main:main";
889+
const { storePath } = await createSessionStoreFixture({
890+
prefix: "updateSessionStore-mutable-cache-throw",
891+
entries: {
892+
[mainSessionKey]: {
893+
sessionId: "sess-1",
894+
updatedAt: 123,
895+
thinkingLevel: "low",
896+
},
897+
},
898+
});
899+
900+
expect(loadSessionStore(storePath)[mainSessionKey]?.thinkingLevel).toBe("low");
901+
902+
await expect(
903+
updateSessionStore(
904+
storePath,
905+
(store) => {
906+
const existing = store[mainSessionKey];
907+
if (!existing) {
908+
throw new Error("missing session entry");
909+
}
910+
store[mainSessionKey] = {
911+
...existing,
912+
thinkingLevel: "mutated-before-throw",
913+
};
914+
throw new Error("boom");
915+
},
916+
{ skipMaintenance: true },
917+
),
918+
).rejects.toThrow("boom");
919+
920+
const readSpy = vi.spyOn(fsSync, "readFileSync");
921+
try {
922+
const store = loadSessionStore(storePath);
923+
expect(readSpy).toHaveBeenCalled();
924+
expect(store[mainSessionKey]?.thinkingLevel).toBe("low");
925+
} finally {
926+
readSpy.mockRestore();
927+
}
928+
});
841929
});

0 commit comments

Comments
 (0)