Skip to content

Commit 2bb00f6

Browse files
committed
fix(agents): fence embedded session writes
1 parent 95eac52 commit 2bb00f6

9 files changed

Lines changed: 281 additions & 20 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai
1515

1616
- Agents: cap heartbeat model bleed context hints by the stored session window when runtime model metadata is unavailable, so overflow recovery advice does not suggest a larger window than the active session actually has.
1717
- Control UI/Web Push: use `https://openclaw.ai` as the generated default VAPID subject instead of the old localhost mailbox so iOS PWA push setup uses an Apple-acceptable subject when `OPENCLAW_VAPID_SUBJECT` is unset. Fixes #83134. (#83317) Thanks @IWhatsskill.
18+
- Agents/Pi: keep embedded session transcript writes from tripping false takeover detection after packaged npm onboarding agent turns.
1819
- Memory/search: stop recall tracking from writing dreaming side-effect artifacts when `dreaming.enabled=false`, while preserving normal search results. Fixes #84436. (#84444) Thanks @NianJiuZst.
1920
- Diffs: render viewer toolbar icons from a closed icon-name map instead of HTML strings, removing the toolbar icon XSS sink. (#83955) Thanks @tanshanshan.
2021
- QA: keep `pnpm qa:e2e` self-check runs inside the private QA runtime envelope even when inherited shell env disables bundled plugins.

src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,62 @@ describe("embedded attempt session lock lifecycle", () => {
183183
expect(release).toHaveBeenCalledTimes(2);
184184
});
185185

186+
it("refreshes the prompt fence after an owned write throws", async () => {
187+
const sessionFile = await createTempSessionFile();
188+
const release = vi.fn(async () => {});
189+
const acquireSessionWriteLock = vi.fn(async () => ({ release }));
190+
const controller = await createEmbeddedAttemptSessionLockController({
191+
acquireSessionWriteLock,
192+
lockOptions: { ...lockOptions, sessionFile },
193+
});
194+
195+
await controller.releaseForPrompt();
196+
await expect(
197+
controller.withSessionWriteLock(async () => {
198+
await fs.appendFile(sessionFile, '{"type":"message","id":"owned-before-error"}\n', "utf8");
199+
throw new Error("downstream event handler failed");
200+
}),
201+
).rejects.toThrow("downstream event handler failed");
202+
await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize");
203+
204+
expect(controller.hasSessionTakeover()).toBe(false);
205+
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(3);
206+
expect(release).toHaveBeenCalledTimes(3);
207+
});
208+
209+
it("does not reuse a released lock from inherited async context", async () => {
210+
const sessionFile = await createTempSessionFile();
211+
let resumeDetached!: () => void;
212+
const detachedGate = new Promise<void>((resolve) => {
213+
resumeDetached = resolve;
214+
});
215+
const release = vi.fn(async () => {});
216+
const acquireSessionWriteLock = vi.fn(async () => ({ release }));
217+
const controller = await createEmbeddedAttemptSessionLockController({
218+
acquireSessionWriteLock,
219+
lockOptions: { ...lockOptions, sessionFile },
220+
});
221+
222+
await controller.releaseForPrompt();
223+
let detachedWrite!: Promise<void>;
224+
await controller.withSessionWriteLock(async () => {
225+
detachedWrite = (async () => {
226+
await detachedGate;
227+
await controller.withSessionWriteLock(async () => {
228+
await fs.appendFile(sessionFile, '{"type":"message","id":"detached-owned"}\n', "utf8");
229+
});
230+
})();
231+
});
232+
233+
resumeDetached();
234+
await detachedWrite;
235+
await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize");
236+
237+
expect(controller.hasSessionTakeover()).toBe(false);
238+
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(4);
239+
expect(release).toHaveBeenCalledTimes(4);
240+
});
241+
186242
it("refreshes the prompt fence after an owned transcript mirror append", async () => {
187243
const sessionFile = await createTempSessionFile();
188244
const release = vi.fn(async () => {});
@@ -214,6 +270,23 @@ describe("embedded attempt session lock lifecycle", () => {
214270
expect(release).toHaveBeenCalledTimes(3);
215271
});
216272

273+
it("refreshes the prompt fence after an owned session manager append", async () => {
274+
const sessionFile = await createTempSessionFile();
275+
const release = vi.fn(async () => {});
276+
const acquireSessionWriteLock = vi.fn(async () => ({ release }));
277+
const controller = await createEmbeddedAttemptSessionLockController({
278+
acquireSessionWriteLock,
279+
lockOptions: { ...lockOptions, sessionFile },
280+
});
281+
282+
await controller.releaseForPrompt();
283+
await fs.appendFile(sessionFile, '{"type":"message","id":"owned-session-manager"}\n', "utf8");
284+
controller.refreshAfterOwnedSessionWrite();
285+
286+
await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize");
287+
expect(controller.hasSessionTakeover()).toBe(false);
288+
});
289+
217290
it("returns a no-op cleanup lock after prompt lock reacquisition times out", async () => {
218291
const releases: string[] = [];
219292
const acquireSessionWriteLock = vi
@@ -379,6 +452,49 @@ describe("embedded attempt session lock lifecycle", () => {
379452
expect(releases).toEqual(["released", "released", "released"]);
380453
});
381454

455+
it("makes the Pi event listener await locked session event processing", async () => {
456+
const events: string[] = [];
457+
const session = {
458+
_agentEventQueue: Promise.resolve(),
459+
_disconnectFromAgent: vi.fn(() => events.push("disconnect")),
460+
_reconnectToAgent: vi.fn(() => events.push("reconnect")),
461+
_processAgentEvent: vi.fn(async (event: { type?: string }) => {
462+
events.push(`process:${event.type}`);
463+
}),
464+
_handleAgentEvent(event: { type?: string }) {
465+
events.push(`handle:${event.type}`);
466+
session["_agentEventQueue"] = session["_agentEventQueue"].then(() =>
467+
session["_processAgentEvent"](event),
468+
);
469+
session["_agentEventQueue"].catch(() => {});
470+
},
471+
};
472+
473+
installSessionEventWriteLock({
474+
session,
475+
withSessionWriteLock: async (run) => {
476+
events.push("lock");
477+
return await run();
478+
},
479+
});
480+
481+
const handleAgentEvent = session["_handleAgentEvent"];
482+
const result = handleAgentEvent({ type: "message_end" }) as unknown as Promise<unknown>;
483+
484+
expect(result).toHaveProperty("then");
485+
expect(events).toEqual(["disconnect", "reconnect", "handle:message_end"]);
486+
487+
await result;
488+
489+
expect(events).toEqual([
490+
"disconnect",
491+
"reconnect",
492+
"handle:message_end",
493+
"lock",
494+
"process:message_end",
495+
]);
496+
});
497+
382498
it("locks Pi extension hooks that can mutate the session outside agent events", async () => {
383499
const locked: string[] = [];
384500
const called: string[] = [];

src/agents/pi-embedded-runner/run/attempt.session-lock.ts

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import { AsyncLocalStorage } from "node:async_hooks";
2+
import { statSync } from "node:fs";
23
import fs from "node:fs/promises";
34
import { isSessionWriteLockTimeoutError } from "../../session-write-lock-error.js";
45
import type { acquireSessionWriteLock } from "../../session-write-lock.js";
56

67
type SessionLock = Awaited<ReturnType<typeof acquireSessionWriteLock>>;
78
type AcquireSessionWriteLock = typeof acquireSessionWriteLock;
9+
type ActiveWriteLockState = {
10+
active: boolean;
11+
};
812

913
type LockOptions = {
1014
sessionFile: string;
@@ -25,6 +29,16 @@ type SessionEventQueueOwner = {
2529
_agentEventQueue?: PromiseLike<unknown>;
2630
};
2731

32+
type SessionEventQueueBridge = SessionEventQueueOwner & {
33+
_handleAgentEvent?: AwaitableSessionEventHandler;
34+
_disconnectFromAgent?: () => void;
35+
_reconnectToAgent?: () => void;
36+
};
37+
38+
type AwaitableSessionEventHandler = ((event: unknown, signal?: unknown) => unknown) & {
39+
__openclawSessionEventQueueAwaitInstalled?: boolean;
40+
};
41+
2842
type SessionWithAgentPrompt = {
2943
agent?: {
3044
streamFn?: PromptReleaseStreamFn;
@@ -147,6 +161,25 @@ async function readSessionFileFingerprint(sessionFile: string): Promise<SessionF
147161
}
148162
}
149163

164+
function readSessionFileFingerprintSync(sessionFile: string): SessionFileFingerprint {
165+
try {
166+
const stat = statSync(sessionFile, { bigint: true });
167+
return {
168+
exists: true,
169+
dev: stat.dev,
170+
ino: stat.ino,
171+
size: stat.size,
172+
mtimeNs: stat.mtimeNs,
173+
ctimeNs: stat.ctimeNs,
174+
};
175+
} catch (err) {
176+
if ((err as NodeJS.ErrnoException).code === "ENOENT") {
177+
return { exists: false };
178+
}
179+
throw err;
180+
}
181+
}
182+
150183
async function waitForSessionEventQueue(session: unknown): Promise<void> {
151184
const owner = session as SessionEventQueueOwner;
152185
for (let attempts = 0; attempts < 5; attempts += 1) {
@@ -165,6 +198,41 @@ async function waitForSessionEventQueue(session: unknown): Promise<void> {
165198
}
166199
}
167200

201+
function installAwaitableSessionEventQueue(session: unknown): void {
202+
const owner = session as SessionEventQueueBridge;
203+
const original = owner["_handleAgentEvent"];
204+
if (
205+
typeof original !== "function" ||
206+
original["__openclawSessionEventQueueAwaitInstalled"] === true
207+
) {
208+
return;
209+
}
210+
211+
const canReconnect =
212+
typeof owner["_disconnectFromAgent"] === "function" &&
213+
typeof owner["_reconnectToAgent"] === "function";
214+
if (canReconnect) {
215+
owner["_disconnectFromAgent"]?.();
216+
}
217+
218+
const wrapped: AwaitableSessionEventHandler = function awaitableSessionEventQueue(
219+
...args: [event: unknown, signal?: unknown]
220+
) {
221+
const result = original(...args);
222+
const queue = owner["_agentEventQueue"];
223+
if (queue && typeof queue.then === "function") {
224+
return Promise.resolve(queue);
225+
}
226+
return result;
227+
};
228+
wrapped["__openclawSessionEventQueueAwaitInstalled"] = true;
229+
owner["_handleAgentEvent"] = wrapped;
230+
231+
if (canReconnect) {
232+
owner["_reconnectToAgent"]?.();
233+
}
234+
}
235+
168236
export class EmbeddedAttemptSessionTakeoverError extends Error {
169237
constructor(sessionFile: string) {
170238
super(`session file changed while embedded prompt lock was released: ${sessionFile}`);
@@ -176,6 +244,7 @@ export function installSessionEventWriteLock(params: {
176244
session: unknown;
177245
withSessionWriteLock: <T>(run: () => Promise<T> | T) => Promise<T>;
178246
}): void {
247+
installAwaitableSessionEventQueue(params.session);
179248
const session = params.session as SessionEventProcessor;
180249
const original = session["_processAgentEvent"];
181250
if (
@@ -243,6 +312,7 @@ export function installSessionExternalHookWriteLock(params: {
243312

244313
export type EmbeddedAttemptSessionLockController = {
245314
releaseForPrompt(): Promise<void>;
315+
refreshAfterOwnedSessionWrite(): void;
246316
waitForSessionEvents(session: unknown): Promise<void>;
247317
withSessionWriteLock<T>(run: () => Promise<T> | T): Promise<T>;
248318
acquireForCleanup(params?: { session?: unknown }): Promise<SessionLock>;
@@ -262,7 +332,7 @@ export async function createEmbeddedAttemptSessionLockController(params: {
262332
});
263333

264334
let heldLock: SessionLock | undefined = await acquireLock();
265-
const activeWriteLock = new AsyncLocalStorage<SessionLock>();
335+
const activeWriteLock = new AsyncLocalStorage<ActiveWriteLockState>();
266336
let fenceFingerprint: SessionFileFingerprint | undefined;
267337
let fenceActive = false;
268338
let takeoverDetected = false;
@@ -311,24 +381,36 @@ export async function createEmbeddedAttemptSessionLockController(params: {
311381
fenceActive = true;
312382
await lock.release();
313383
},
384+
refreshAfterOwnedSessionWrite(): void {
385+
if (fenceActive && !takeoverDetected) {
386+
fenceFingerprint = readSessionFileFingerprintSync(params.lockOptions.sessionFile);
387+
}
388+
},
314389
waitForSessionEvents: waitForSessionEventQueue,
315390
async withSessionWriteLock<T>(run: () => Promise<T> | T): Promise<T> {
316391
if (takeoverDetected) {
317392
throw new EmbeddedAttemptSessionTakeoverError(params.lockOptions.sessionFile);
318393
}
319-
if (activeWriteLock.getStore()) {
394+
if (activeWriteLock.getStore()?.active === true) {
320395
return await run();
321396
}
322397
const { lock, owned } = await acquireWriteLock();
323398
try {
324399
await assertSessionFileFence();
325400
const runWithLock = async () => {
326-
const result = await run();
327-
await refreshSessionFileFence();
328-
return result;
401+
try {
402+
return await run();
403+
} finally {
404+
await refreshSessionFileFence();
405+
}
329406
};
330407
if (owned) {
331-
return await activeWriteLock.run(lock, runWithLock);
408+
const activeLockState: ActiveWriteLockState = { active: true };
409+
try {
410+
return await activeWriteLock.run(activeLockState, runWithLock);
411+
} finally {
412+
activeLockState.active = false;
413+
}
332414
}
333415
return await runWithLock();
334416
} finally {

src/agents/pi-embedded-runner/run/attempt.ts

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ import {
1515
runQuotaSuspensionMaintenance,
1616
updateSessionStoreEntry,
1717
} from "../../../config/sessions/store.js";
18-
import { withOwnedSessionTranscriptWrites } from "../../../config/sessions/transcript-write-context.js";
18+
import {
19+
bindOwnedSessionTranscriptWrites,
20+
withOwnedSessionTranscriptWrites,
21+
} from "../../../config/sessions/transcript-write-context.js";
1922
import { resolveContextEngineOwnerPluginId } from "../../../context-engine/registry.js";
2023
import type { AssembleResult } from "../../../context-engine/types.js";
2124
import { emitTrustedDiagnosticEvent } from "../../../infra/diagnostic-events.js";
@@ -2116,6 +2119,9 @@ export async function runEmbeddedAttempt(
21162119
suppressTranscriptOnlyAssistantPersistence:
21172120
params.suppressTranscriptOnlyAssistantPersistence,
21182121
suppressAssistantErrorPersistence: params.suppressAssistantErrorPersistence,
2122+
onMessagePersisted: () => {
2123+
sessionLockController.refreshAfterOwnedSessionWrite();
2124+
},
21192125
onUserMessagePersisted: (message) => {
21202126
params.onUserMessagePersisted?.(message);
21212127
},
@@ -3173,19 +3179,26 @@ export async function runEmbeddedAttempt(
31733179
};
31743180
const abortable = <T>(promise: Promise<T>): Promise<T> =>
31753181
abortableWithSignal(runAbortController.signal, promise);
3182+
const ownedTranscriptWriteContext = {
3183+
sessionFile: params.sessionFile,
3184+
sessionKey: params.sessionKey,
3185+
withSessionWriteLock: <T>(operation: () => Promise<T> | T) =>
3186+
sessionLockController.withSessionWriteLock(operation),
3187+
};
31763188
const promptActiveSession = (
31773189
prompt: string,
31783190
options?: Parameters<typeof activeSession.prompt>[1],
31793191
): Promise<void> =>
31803192
withOwnedSessionTranscriptWrites(
3181-
{
3182-
sessionFile: params.sessionFile,
3183-
sessionKey: params.sessionKey,
3184-
withSessionWriteLock: (operation) =>
3185-
sessionLockController.withSessionWriteLock(operation),
3186-
},
3193+
ownedTranscriptWriteContext,
31873194
async () => abortable(trackPromptSettlePromise(activeSession.prompt(prompt, options))),
31883195
);
3196+
const onBlockReply = params.onBlockReply
3197+
? bindOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, params.onBlockReply)
3198+
: undefined;
3199+
const onBlockReplyFlush = params.onBlockReplyFlush
3200+
? bindOwnedSessionTranscriptWrites(ownedTranscriptWriteContext, params.onBlockReplyFlush)
3201+
: undefined;
31893202

31903203
const subscription = subscribeEmbeddedPiSession(
31913204
buildEmbeddedSubscriptionParams({
@@ -3203,8 +3216,8 @@ export async function runEmbeddedAttempt(
32033216
onToolResult: params.onToolResult,
32043217
onReasoningStream: params.onReasoningStream,
32053218
onReasoningEnd: params.onReasoningEnd,
3206-
onBlockReply: params.onBlockReply,
3207-
onBlockReplyFlush: params.onBlockReplyFlush,
3219+
onBlockReply,
3220+
onBlockReplyFlush,
32083221
blockReplyBreak: params.blockReplyBreak,
32093222
blockReplyChunking: params.blockReplyChunking,
32103223
onPartialReply: params.onPartialReply,
@@ -4204,8 +4217,8 @@ export async function runEmbeddedAttempt(
42044217
// user receives the assistant response immediately. Without this,
42054218
// coalesced/buffered blocks stay in the pipeline until compaction
42064219
// finishes — which can take minutes on large contexts (#35074).
4207-
if (params.onBlockReplyFlush) {
4208-
await params.onBlockReplyFlush();
4220+
if (onBlockReplyFlush) {
4221+
await onBlockReplyFlush();
42094222
}
42104223

42114224
// Skip compaction wait when yield aborted the run — the signal is

0 commit comments

Comments
 (0)