Skip to content

Commit 552c794

Browse files
luoyanglangsteipete
authored andcommitted
fix(agents): avoid session event queue self-wait
1 parent 90f3007 commit 552c794

2 files changed

Lines changed: 188 additions & 8 deletions

File tree

src/agents/embedded-agent-runner/run/attempt.session-lock.test.ts

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,6 +1189,147 @@ describe("embedded attempt session lock lifecycle", () => {
11891189
]);
11901190
});
11911191

1192+
it("does not wait on the session event queue from a hook running during active event processing", async () => {
1193+
const events: string[] = [];
1194+
const session = {
1195+
_agentEventQueue: Promise.resolve(),
1196+
_extensionRunner: {
1197+
hasHandlers: vi.fn((eventType: string) => eventType === "tool_call"),
1198+
},
1199+
_processAgentEvent: vi.fn(async (event: { type?: string }) => {
1200+
events.push(`process:${event.type}`);
1201+
await session.agent.beforeToolCall();
1202+
events.push("process:end");
1203+
}),
1204+
_handleAgentEvent(event: { type?: string }) {
1205+
events.push(`handle:${event.type}`);
1206+
session["_agentEventQueue"] = session["_agentEventQueue"].then(() =>
1207+
session["_processAgentEvent"](event),
1208+
);
1209+
session["_agentEventQueue"].catch(() => {});
1210+
},
1211+
agent: {
1212+
beforeToolCall: vi.fn(async () => {
1213+
events.push("hook");
1214+
}),
1215+
},
1216+
};
1217+
1218+
installSessionEventWriteLock({
1219+
session,
1220+
withSessionWriteLock: async (run) => {
1221+
events.push("event-lock");
1222+
return await run();
1223+
},
1224+
});
1225+
installSessionExternalHookWriteLock({
1226+
session,
1227+
withSessionWriteLock: async (run) => {
1228+
events.push("hook-lock");
1229+
return await run();
1230+
},
1231+
});
1232+
1233+
const result = session["_handleAgentEvent"]({
1234+
type: "tool_call",
1235+
}) as unknown as Promise<unknown>;
1236+
const completion = await Promise.race([
1237+
result.then(() => "done"),
1238+
new Promise<string>((resolve) => {
1239+
setTimeout(() => resolve("timeout"), 25);
1240+
}),
1241+
]);
1242+
1243+
expect(completion).toBe("done");
1244+
expect(events).toEqual([
1245+
"event-lock",
1246+
"handle:tool_call",
1247+
"process:tool_call",
1248+
"hook-lock",
1249+
"hook",
1250+
"process:end",
1251+
]);
1252+
});
1253+
1254+
it("drains queued session events for async hook work spawned after event processing returns", async () => {
1255+
const events: string[] = [];
1256+
let releaseQueue!: () => void;
1257+
let spawnedHook!: Promise<void>;
1258+
const session = {
1259+
_agentEventQueue: Promise.resolve(),
1260+
_extensionRunner: {
1261+
hasHandlers: vi.fn((eventType: string) => eventType === "tool_call"),
1262+
},
1263+
_processAgentEvent: vi.fn(async (event: { type?: string }) => {
1264+
events.push(`process:${event.type}`);
1265+
spawnedHook = new Promise<void>((resolve, reject) => {
1266+
setTimeout(() => {
1267+
session.agent.beforeToolCall().then(resolve, reject);
1268+
}, 0);
1269+
});
1270+
session["_agentEventQueue"] = new Promise<void>((resolve) => {
1271+
releaseQueue = resolve;
1272+
}).then(() => {
1273+
events.push("queue-drained");
1274+
});
1275+
events.push("process:end");
1276+
}),
1277+
_handleAgentEvent(event: { type?: string }) {
1278+
events.push(`handle:${event.type}`);
1279+
session["_agentEventQueue"] = session["_agentEventQueue"].then(() =>
1280+
session["_processAgentEvent"](event),
1281+
);
1282+
session["_agentEventQueue"].catch(() => {});
1283+
},
1284+
agent: {
1285+
beforeToolCall: vi.fn(async () => {
1286+
events.push("hook");
1287+
}),
1288+
},
1289+
};
1290+
1291+
installSessionEventWriteLock({
1292+
session,
1293+
withSessionWriteLock: async (run) => {
1294+
events.push("event-lock");
1295+
return await run();
1296+
},
1297+
});
1298+
installSessionExternalHookWriteLock({
1299+
session,
1300+
withSessionWriteLock: async (run) => {
1301+
events.push("hook-lock");
1302+
return await run();
1303+
},
1304+
});
1305+
1306+
session["_handleAgentEvent"]({ type: "tool_call" });
1307+
await new Promise<void>((resolve) => setTimeout(resolve, 5));
1308+
1309+
const beforeQueueRelease = await Promise.race([
1310+
spawnedHook.then(() => "done"),
1311+
new Promise<string>((resolve) => {
1312+
setTimeout(() => resolve("waiting"), 25);
1313+
}),
1314+
]);
1315+
1316+
expect(beforeQueueRelease).toBe("waiting");
1317+
expect(events).toEqual(["event-lock", "handle:tool_call", "process:tool_call", "process:end"]);
1318+
1319+
releaseQueue();
1320+
await spawnedHook;
1321+
1322+
expect(events).toEqual([
1323+
"event-lock",
1324+
"handle:tool_call",
1325+
"process:tool_call",
1326+
"process:end",
1327+
"queue-drained",
1328+
"hook-lock",
1329+
"hook",
1330+
]);
1331+
});
1332+
11921333
it("locks OpenClaw extension hooks that can mutate the session outside agent events", async () => {
11931334
const locked: string[] = [];
11941335
const called: string[] = [];

src/agents/embedded-agent-runner/run/attempt.session-lock.ts

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,32 @@ type LockableFunction = ((...args: unknown[]) => unknown) & {
7474
__openclawSessionWriteLockInstalled?: boolean;
7575
};
7676

77+
type SessionEventHookContext = {
78+
session: unknown;
79+
active: boolean;
80+
};
81+
82+
const sessionEventHookContext = new AsyncLocalStorage<SessionEventHookContext>();
83+
84+
function isProcessingAgentEventInCurrentChain(session: unknown): boolean {
85+
const context = sessionEventHookContext.getStore();
86+
return context?.active === true && context.session === session;
87+
}
88+
89+
async function withProcessingAgentEvent<T>(
90+
session: unknown,
91+
run: () => Promise<T> | T,
92+
): Promise<T> {
93+
const context: SessionEventHookContext = { session, active: true };
94+
return await sessionEventHookContext.run(context, async () => {
95+
try {
96+
return await run();
97+
} finally {
98+
context.active = false;
99+
}
100+
});
101+
}
102+
77103
function sessionHasExtensionHandlers(session: SessionEventProcessor, eventType: string): boolean {
78104
const extensionRunner = session["_extensionRunner"];
79105
const hasHandlers = extensionRunner?.hasHandlers;
@@ -662,6 +688,15 @@ async function waitForSessionEventQueue(session: unknown): Promise<void> {
662688
}
663689
}
664690

691+
async function waitForSessionEventQueueBeforeHook(session: unknown): Promise<void> {
692+
// Hook calls made by _handleAgentEvent are already inside the current queue
693+
// entry. Draining there waits on itself; detached/external hook work still drains.
694+
if (isProcessingAgentEventInCurrentChain(session)) {
695+
return;
696+
}
697+
await waitForSessionEventQueue(session);
698+
}
699+
665700
function installAwaitableSessionEventQueue(session: unknown): void {
666701
const owner = session as SessionEventQueueBridge;
667702
const original = owner["_handleAgentEvent"];
@@ -731,10 +766,14 @@ export function installSessionEventWriteLock(params: {
731766
event: unknown,
732767
signal?: unknown,
733768
) {
734-
if (!eventMayReachTranscriptWriters(session, event)) {
735-
return await original.call(this, event, signal);
736-
}
737-
return await params.withSessionWriteLock(async () => await original.call(this, event, signal));
769+
return await withProcessingAgentEvent(session, async () => {
770+
if (!eventMayReachTranscriptWriters(session, event)) {
771+
return await original.call(this, event, signal);
772+
}
773+
return await params.withSessionWriteLock(
774+
async () => await original.call(this, event, signal),
775+
);
776+
});
738777
};
739778
wrapped["__openclawSessionEventQueueAwaitInstalled"] =
740779
original["__openclawSessionEventQueueAwaitInstalled"];
@@ -756,28 +795,28 @@ export function installSessionExternalHookWriteLock(params: {
756795
owner: agent as Record<string, unknown>,
757796
key: "beforeToolCall",
758797
shouldLock: () => true,
759-
waitBeforeLock: () => waitForSessionEventQueue(session),
798+
waitBeforeLock: () => waitForSessionEventQueueBeforeHook(session),
760799
withSessionWriteLock: params.withSessionWriteLock,
761800
});
762801
installLockableFunction({
763802
owner: agent as Record<string, unknown>,
764803
key: "afterToolCall",
765804
shouldLock: () => sessionHasExtensionHandlers(session, "tool_result"),
766-
waitBeforeLock: () => waitForSessionEventQueue(session),
805+
waitBeforeLock: () => waitForSessionEventQueueBeforeHook(session),
767806
withSessionWriteLock: params.withSessionWriteLock,
768807
});
769808
installLockableFunction({
770809
owner: agent as Record<string, unknown>,
771810
key: "onPayload",
772811
shouldLock: () => sessionHasExtensionHandlers(session, "before_provider_request"),
773-
waitBeforeLock: () => waitForSessionEventQueue(session),
812+
waitBeforeLock: () => waitForSessionEventQueueBeforeHook(session),
774813
withSessionWriteLock: params.withSessionWriteLock,
775814
});
776815
installLockableFunction({
777816
owner: agent as Record<string, unknown>,
778817
key: "onResponse",
779818
shouldLock: () => sessionHasExtensionHandlers(session, "after_provider_response"),
780-
waitBeforeLock: () => waitForSessionEventQueue(session),
819+
waitBeforeLock: () => waitForSessionEventQueueBeforeHook(session),
781820
withSessionWriteLock: params.withSessionWriteLock,
782821
});
783822
}

0 commit comments

Comments
 (0)