Skip to content

Commit 054ea49

Browse files
committed
fix: refresh prompt fence after compaction writes
1 parent ac9a219 commit 054ea49

5 files changed

Lines changed: 368 additions & 1 deletion

File tree

src/agents/embedded-agent-runner/run/attempt.session-lock.test.ts

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// Coverage for embedded attempt session-file ownership and write locks.
2+
import { appendFileSync } from "node:fs";
23
import fs from "node:fs/promises";
34
import os from "node:os";
45
import path from "node:path";
@@ -9,6 +10,7 @@ import {
910
runWithOwnedSessionTranscriptWritePublication,
1011
withOwnedSessionTranscriptWrites,
1112
} from "../../../config/sessions/transcript-write-context.js";
13+
import { guardSessionManager } from "../../session-tool-result-guard-wrapper.js";
1214
import {
1315
SessionWriteLockStaleError,
1416
SessionWriteLockTimeoutError,
@@ -17,6 +19,7 @@ import {
1719
acquireSessionWriteLock,
1820
resetSessionWriteLockStateForTest,
1921
} from "../../session-write-lock.js";
22+
import { SessionManager } from "../../sessions/session-manager.js";
2023
import {
2124
acquireEmbeddedAttemptSessionFileOwner,
2225
createEmbeddedAttemptSessionLockController,
@@ -904,6 +907,273 @@ describe("embedded attempt session lock lifecycle", () => {
904907
expect(controller.hasSessionTakeover()).toBe(false);
905908
});
906909

910+
it("refreshes the prompt fence after an owned session manager compaction append", async () => {
911+
const sessionFile = await createTempSessionFile();
912+
const release = vi.fn(async () => {});
913+
const acquireSessionWriteLockLocal2 = vi.fn(async () => ({ release }));
914+
const controller = await createEmbeddedAttemptSessionLockController({
915+
acquireSessionWriteLock: acquireSessionWriteLockLocal2,
916+
lockOptions: { ...lockOptions, sessionFile },
917+
});
918+
const sessionManager = guardSessionManager(SessionManager.open(sessionFile), {
919+
withCompactionPersistence: (append, validateAppend) =>
920+
controller.withOwnedSessionFileWrite(append, validateAppend),
921+
});
922+
const firstKeptEntryId = sessionManager.appendMessage({
923+
role: "user",
924+
content: "old question",
925+
timestamp: 1,
926+
});
927+
sessionManager.appendMessage({
928+
role: "assistant",
929+
content: [{ type: "text", text: "old answer" }],
930+
api: "messages",
931+
provider: "openclaw",
932+
model: "session-lock-test",
933+
usage: {
934+
input: 0,
935+
output: 0,
936+
cacheRead: 0,
937+
cacheWrite: 0,
938+
totalTokens: 0,
939+
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
940+
},
941+
stopReason: "stop",
942+
timestamp: 2,
943+
});
944+
945+
await controller.releaseForPrompt();
946+
sessionManager.appendCompaction("threshold summary", firstKeptEntryId, 160_001);
947+
948+
await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize");
949+
expect(controller.hasSessionTakeover()).toBe(false);
950+
});
951+
952+
it("still rejects unowned external compaction appends before the prompt stream lock is reacquired", async () => {
953+
const sessionFile = await createTempSessionFile();
954+
const release = vi.fn(async () => {});
955+
const acquireSessionWriteLockLocal1 = vi.fn(async () => ({ release }));
956+
const controller = await createEmbeddedAttemptSessionLockController({
957+
acquireSessionWriteLock: acquireSessionWriteLockLocal1,
958+
lockOptions: { ...lockOptions, sessionFile },
959+
});
960+
961+
await controller.releaseForPrompt();
962+
await fs.appendFile(
963+
sessionFile,
964+
JSON.stringify({
965+
type: "compaction",
966+
id: "external-compaction",
967+
parentId: "session",
968+
timestamp: new Date().toISOString(),
969+
summary: "external summary",
970+
firstKeptEntryId: "session",
971+
tokensBefore: 160_001,
972+
}) + "\n",
973+
"utf8",
974+
);
975+
976+
await expect(controller.withSessionWriteLock(() => "finalize")).rejects.toBeInstanceOf(
977+
EmbeddedAttemptSessionTakeoverError,
978+
);
979+
expect(controller.hasSessionTakeover()).toBe(true);
980+
});
981+
982+
it("still rejects an external edit that happens before an owned session manager compaction append", async () => {
983+
const sessionFile = await createTempSessionFile();
984+
const release = vi.fn(async () => {});
985+
const acquireSessionWriteLockLocal0 = vi.fn(async () => ({ release }));
986+
const controller = await createEmbeddedAttemptSessionLockController({
987+
acquireSessionWriteLock: acquireSessionWriteLockLocal0,
988+
lockOptions: { ...lockOptions, sessionFile },
989+
});
990+
const sessionManager = guardSessionManager(SessionManager.open(sessionFile), {
991+
withCompactionPersistence: (append, validateAppend) =>
992+
controller.withOwnedSessionFileWrite(append, validateAppend),
993+
});
994+
const firstKeptEntryId = sessionManager.appendMessage({
995+
role: "user",
996+
content: "old question",
997+
timestamp: 1,
998+
});
999+
sessionManager.appendMessage({
1000+
role: "assistant",
1001+
content: [{ type: "text", text: "old answer" }],
1002+
api: "messages",
1003+
provider: "openclaw",
1004+
model: "session-lock-test",
1005+
usage: {
1006+
input: 0,
1007+
output: 0,
1008+
cacheRead: 0,
1009+
cacheWrite: 0,
1010+
totalTokens: 0,
1011+
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
1012+
},
1013+
stopReason: "stop",
1014+
timestamp: 2,
1015+
});
1016+
1017+
await controller.releaseForPrompt();
1018+
await fs.appendFile(sessionFile, '{"type":"message","id":"external-edit"}\n', "utf8");
1019+
sessionManager.appendCompaction("threshold summary", firstKeptEntryId, 160_001);
1020+
1021+
await expect(controller.withSessionWriteLock(() => "finalize")).rejects.toBeInstanceOf(
1022+
EmbeddedAttemptSessionTakeoverError,
1023+
);
1024+
expect(controller.hasSessionTakeover()).toBe(true);
1025+
});
1026+
1027+
it("still rejects an external edit interleaved inside an owned session manager compaction append", async () => {
1028+
const sessionFile = await createTempSessionFile();
1029+
const release = vi.fn(async () => {});
1030+
const acquireSessionWriteLockLocal30 = vi.fn(async () => ({ release }));
1031+
const controller = await createEmbeddedAttemptSessionLockController({
1032+
acquireSessionWriteLock: acquireSessionWriteLockLocal30,
1033+
lockOptions: { ...lockOptions, sessionFile },
1034+
});
1035+
const sessionManager = guardSessionManager(SessionManager.open(sessionFile), {
1036+
withCompactionPersistence: (append, validateAppend) =>
1037+
controller.withOwnedSessionFileWrite(() => {
1038+
appendFileSync(sessionFile, '{"type":"message","id":"external-edit"}\n', "utf8");
1039+
return append();
1040+
}, validateAppend),
1041+
});
1042+
const firstKeptEntryId = sessionManager.appendMessage({
1043+
role: "user",
1044+
content: "old question",
1045+
timestamp: 1,
1046+
});
1047+
sessionManager.appendMessage({
1048+
role: "assistant",
1049+
content: [{ type: "text", text: "old answer" }],
1050+
api: "messages",
1051+
provider: "openclaw",
1052+
model: "session-lock-test",
1053+
usage: {
1054+
input: 0,
1055+
output: 0,
1056+
cacheRead: 0,
1057+
cacheWrite: 0,
1058+
totalTokens: 0,
1059+
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
1060+
},
1061+
stopReason: "stop",
1062+
timestamp: 2,
1063+
});
1064+
1065+
await controller.releaseForPrompt();
1066+
sessionManager.appendCompaction("threshold summary", firstKeptEntryId, 160_001);
1067+
1068+
await expect(controller.withSessionWriteLock(() => "finalize")).rejects.toBeInstanceOf(
1069+
EmbeddedAttemptSessionTakeoverError,
1070+
);
1071+
expect(controller.hasSessionTakeover()).toBe(true);
1072+
});
1073+
1074+
it("allows owned session manager compaction after a later controller advances the prompt fence", async () => {
1075+
const sessionFile = await createTempSessionFile();
1076+
const firstController = await createEmbeddedAttemptSessionLockController({
1077+
acquireSessionWriteLock: vi.fn(async () => ({ release: vi.fn(async () => {}) })),
1078+
lockOptions: { ...lockOptions, sessionFile },
1079+
});
1080+
await firstController.releaseForPrompt();
1081+
await firstController.dispose();
1082+
1083+
const release = vi.fn(async () => {});
1084+
const acquireSessionWriteLockLocal29 = vi.fn(async () => ({ release }));
1085+
const secondController = await createEmbeddedAttemptSessionLockController({
1086+
acquireSessionWriteLock: acquireSessionWriteLockLocal29,
1087+
lockOptions: { ...lockOptions, sessionFile },
1088+
});
1089+
const sessionManager = guardSessionManager(SessionManager.open(sessionFile), {
1090+
withCompactionPersistence: (append, validateAppend) =>
1091+
secondController.withOwnedSessionFileWrite(append, validateAppend),
1092+
});
1093+
const firstKeptEntryId = await secondController.withSessionWriteLock(() => {
1094+
const entryId = sessionManager.appendMessage({
1095+
role: "user",
1096+
content: "new question",
1097+
timestamp: 1,
1098+
});
1099+
sessionManager.appendMessage({
1100+
role: "assistant",
1101+
content: [{ type: "text", text: "new answer" }],
1102+
api: "messages",
1103+
provider: "openclaw",
1104+
model: "session-lock-test",
1105+
usage: {
1106+
input: 0,
1107+
output: 0,
1108+
cacheRead: 0,
1109+
cacheWrite: 0,
1110+
totalTokens: 0,
1111+
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
1112+
},
1113+
stopReason: "stop",
1114+
timestamp: 2,
1115+
});
1116+
return entryId;
1117+
});
1118+
1119+
await secondController.releaseForPrompt();
1120+
sessionManager.appendCompaction("threshold summary", firstKeptEntryId, 160_001);
1121+
1122+
await expect(secondController.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize");
1123+
expect(secondController.hasSessionTakeover()).toBe(false);
1124+
});
1125+
1126+
it("allows owned session manager compaction after another controller publishes an owned write", async () => {
1127+
const sessionFile = await createTempSessionFile();
1128+
const firstController = await createEmbeddedAttemptSessionLockController({
1129+
acquireSessionWriteLock: vi.fn(async () => ({ release: vi.fn(async () => {}) })),
1130+
lockOptions: { ...lockOptions, sessionFile },
1131+
});
1132+
const sessionManager = guardSessionManager(SessionManager.open(sessionFile), {
1133+
withCompactionPersistence: (append, validateAppend) =>
1134+
firstController.withOwnedSessionFileWrite(append, validateAppend),
1135+
});
1136+
const firstKeptEntryId = sessionManager.appendMessage({
1137+
role: "user",
1138+
content: "old question",
1139+
timestamp: 1,
1140+
});
1141+
sessionManager.appendMessage({
1142+
role: "assistant",
1143+
content: [{ type: "text", text: "old answer" }],
1144+
api: "messages",
1145+
provider: "openclaw",
1146+
model: "session-lock-test",
1147+
usage: {
1148+
input: 0,
1149+
output: 0,
1150+
cacheRead: 0,
1151+
cacheWrite: 0,
1152+
totalTokens: 0,
1153+
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
1154+
},
1155+
stopReason: "stop",
1156+
timestamp: 2,
1157+
});
1158+
await firstController.releaseForPrompt();
1159+
1160+
const secondController = await createEmbeddedAttemptSessionLockController({
1161+
acquireSessionWriteLock: vi.fn(async () => ({ release: vi.fn(async () => {}) })),
1162+
lockOptions: { ...lockOptions, sessionFile },
1163+
});
1164+
await secondController.releaseForPrompt();
1165+
await secondController.withSessionWriteLock(
1166+
async () => {
1167+
await fs.appendFile(sessionFile, '{"type":"message","id":"owned-other"}\n', "utf8");
1168+
},
1169+
{ publishOwnedWrite: true },
1170+
);
1171+
sessionManager.appendCompaction("threshold summary", firstKeptEntryId, 160_001);
1172+
1173+
await expect(firstController.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize");
1174+
expect(firstController.hasSessionTakeover()).toBe(false);
1175+
});
1176+
9071177
it("allows post-prompt writes after the prompt context publishes an owned transcript write", async () => {
9081178
const sessionFile = await createTempSessionFile();
9091179
const releases: string[] = [];

src/agents/embedded-agent-runner/run/attempt.session-lock.ts

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* Coordinates embedded-attempt session ownership, takeover, and prompt locks.
33
*/
44
import { AsyncLocalStorage } from "node:async_hooks";
5-
import { statSync } from "node:fs";
5+
import { readFileSync, statSync } from "node:fs";
66
import fs from "node:fs/promises";
77
import { isDeepStrictEqual } from "node:util";
88
import { normalizeStringEntries } from "@openclaw/normalization-core/string-normalization";
@@ -29,6 +29,8 @@ type SessionWriteLockRunOptions = {
2929
publishOwnedWrite?: boolean;
3030
};
3131

32+
type SessionFileWriteAppendValidator<T> = (result: T, appendedText: string) => boolean;
33+
3234
type SessionWithAgentPrompt = {
3335
agent?: {
3436
streamFn?: PromptReleaseStreamFn;
@@ -575,6 +577,10 @@ export type EmbeddedAttemptSessionLockController = {
575577
releaseForPrompt(): Promise<void>;
576578
releaseHeldLockForAbort(): Promise<void>;
577579
refreshAfterOwnedSessionWrite(): void;
580+
withOwnedSessionFileWrite<T>(
581+
run: () => T,
582+
validateAppend?: SessionFileWriteAppendValidator<T>,
583+
): T;
578584
reacquireAfterPrompt(): Promise<void>;
579585
waitForSessionEvents(session: unknown): Promise<void>;
580586
withSessionWriteLock<T>(
@@ -787,6 +793,42 @@ export async function createEmbeddedAttemptSessionLockController(params: {
787793
}
788794
}
789795

796+
// Synchronous append paths cannot await withSessionWriteLock. Only publish
797+
// their post-write fingerprint when the pre-write state was already trusted.
798+
function publishOwnedSessionFileFenceSync<T>(write: {
799+
beforeWrite: SessionFileFingerprint;
800+
result: T;
801+
beforeText?: string;
802+
validateAppend?: SessionFileWriteAppendValidator<T>;
803+
}): void {
804+
if (takeoverDetected) {
805+
return;
806+
}
807+
const fingerprint = readSessionFileFingerprintSync(params.lockOptions.sessionFile);
808+
const beforeWriteIsTrusted =
809+
(fenceActive && sameSessionFileFingerprint(fenceFingerprint, write.beforeWrite)) ||
810+
isTrustedSessionFileState(sessionFileFenceKey, write.beforeWrite);
811+
if (sameSessionFileFingerprint(write.beforeWrite, fingerprint) || !beforeWriteIsTrusted) {
812+
return;
813+
}
814+
if (write.validateAppend) {
815+
const afterText = readFileSync(params.lockOptions.sessionFile, "utf8");
816+
if (
817+
write.beforeText === undefined ||
818+
!afterText.startsWith(write.beforeText) ||
819+
!write.validateAppend(write.result, afterText.slice(write.beforeText.length))
820+
) {
821+
return;
822+
}
823+
}
824+
const generation = recordOwnedSessionFileWrite(sessionFileFenceKey, fingerprint);
825+
if (fenceActive) {
826+
fenceFingerprint = fingerprint;
827+
fenceSnapshot = { fingerprint };
828+
fenceGeneration = generation;
829+
}
830+
}
831+
790832
const noopLock: SessionLock = { release: async () => {} };
791833

792834
async function releaseHeldLockWithFence(): Promise<void> {
@@ -913,6 +955,23 @@ export async function createEmbeddedAttemptSessionLockController(params: {
913955
fenceSnapshot = { fingerprint: fenceFingerprint };
914956
}
915957
},
958+
withOwnedSessionFileWrite<T>(
959+
run: () => T,
960+
validateAppend?: SessionFileWriteAppendValidator<T>,
961+
): T {
962+
const beforeWrite = readSessionFileFingerprintSync(params.lockOptions.sessionFile);
963+
const beforeText = validateAppend
964+
? readFileSync(params.lockOptions.sessionFile, "utf8")
965+
: undefined;
966+
const result = run();
967+
publishOwnedSessionFileFenceSync({
968+
beforeWrite,
969+
result,
970+
...(beforeText !== undefined ? { beforeText } : {}),
971+
...(validateAppend ? { validateAppend } : {}),
972+
});
973+
return result;
974+
},
916975
async reacquireAfterPrompt(): Promise<void> {
917976
await waitForHeldLockDrain();
918977
if (takeoverDetected || heldLock) {

0 commit comments

Comments
 (0)