Skip to content

Commit b789e71

Browse files
authored
fix(agents): avoid session event queue self-wait (#86123)
Avoids a self-wait in embedded agent session event hooks by skipping the queue drain only for hooks running inside the current session event processing chain. Detached or external hook work still drains the queue before taking the session write lock. Verification: - node scripts/run-vitest.mjs run --config test/vitest/vitest.agents-embedded-agent.config.ts src/agents/embedded-agent-runner/run/attempt.session-lock.test.ts - node scripts/run-oxlint.mjs --tsconfig config/tsconfig/oxlint.core.json src/agents/embedded-agent-runner/run/attempt.session-lock.test.ts src/agents/embedded-agent-runner/run/attempt.session-lock.ts --threads=8 - .agents/skills/autoreview/scripts/autoreview --mode branch --base origin/main - GitHub CI: https://github.com/openclaw/openclaw/actions/runs/26533883763 Thanks @luoyanglang. Co-authored-by: luoyanglang <hanwanlonga@gmail.com>
1 parent e339586 commit b789e71

2 files changed

Lines changed: 188 additions & 8 deletions

File tree

src/agents/embedded-agent-runner/run/attempt.session-lock.test.ts

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,6 +1189,147 @@ describe("embedded attempt session lock lifecycle", () => {
11891189
]);
11901190
});
11911191

1192+
it("does not wait on the session event queue from a hook running during active event processing", async () => {
1193+
const events: string[] = [];
1194+
const session = {
1195+
_agentEventQueue: Promise.resolve(),
1196+
_extensionRunner: {
1197+
hasHandlers: vi.fn((eventType: string) => eventType === "tool_call"),
1198+
},
1199+
_processAgentEvent: vi.fn(async (event: { type?: string }) => {
1200+
events.push(`process:${event.type}`);
1201+
await session.agent.beforeToolCall();
1202+
events.push("process:end");
1203+
}),
1204+
_handleAgentEvent(event: { type?: string }) {
1205+
events.push(`handle:${event.type}`);
1206+
session["_agentEventQueue"] = session["_agentEventQueue"].then(() =>
1207+
session["_processAgentEvent"](event),
1208+
);
1209+
session["_agentEventQueue"].catch(() => {});
1210+
},
1211+
agent: {
1212+
beforeToolCall: vi.fn(async () => {
1213+
events.push("hook");
1214+
}),
1215+
},
1216+
};
1217+
1218+
installSessionEventWriteLock({
1219+
session,
1220+
withSessionWriteLock: async (run) => {
1221+
events.push("event-lock");
1222+
return await run();
1223+
},
1224+
});
1225+
installSessionExternalHookWriteLock({
1226+
session,
1227+
withSessionWriteLock: async (run) => {
1228+
events.push("hook-lock");
1229+
return await run();
1230+
},
1231+
});
1232+
1233+
const result = session["_handleAgentEvent"]({
1234+
type: "tool_call",
1235+
}) as unknown as Promise<unknown>;
1236+
const completion = await Promise.race([
1237+
result.then(() => "done"),
1238+
new Promise<string>((resolve) => {
1239+
setTimeout(() => resolve("timeout"), 25);
1240+
}),
1241+
]);
1242+
1243+
expect(completion).toBe("done");
1244+
expect(events).toEqual([
1245+
"event-lock",
1246+
"handle:tool_call",
1247+
"process:tool_call",
1248+
"hook-lock",
1249+
"hook",
1250+
"process:end",
1251+
]);
1252+
});
1253+
1254+
it("drains queued session events for async hook work spawned after event processing returns", async () => {
1255+
const events: string[] = [];
1256+
let releaseQueue!: () => void;
1257+
let spawnedHook!: Promise<void>;
1258+
const session = {
1259+
_agentEventQueue: Promise.resolve(),
1260+
_extensionRunner: {
1261+
hasHandlers: vi.fn((eventType: string) => eventType === "tool_call"),
1262+
},
1263+
_processAgentEvent: vi.fn(async (event: { type?: string }) => {
1264+
events.push(`process:${event.type}`);
1265+
spawnedHook = new Promise<void>((resolve, reject) => {
1266+
setTimeout(() => {
1267+
session.agent.beforeToolCall().then(resolve, reject);
1268+
}, 0);
1269+
});
1270+
session["_agentEventQueue"] = new Promise<void>((resolve) => {
1271+
releaseQueue = resolve;
1272+
}).then(() => {
1273+
events.push("queue-drained");
1274+
});
1275+
events.push("process:end");
1276+
}),
1277+
_handleAgentEvent(event: { type?: string }) {
1278+
events.push(`handle:${event.type}`);
1279+
session["_agentEventQueue"] = session["_agentEventQueue"].then(() =>
1280+
session["_processAgentEvent"](event),
1281+
);
1282+
session["_agentEventQueue"].catch(() => {});
1283+
},
1284+
agent: {
1285+
beforeToolCall: vi.fn(async () => {
1286+
events.push("hook");
1287+
}),
1288+
},
1289+
};
1290+
1291+
installSessionEventWriteLock({
1292+
session,
1293+
withSessionWriteLock: async (run) => {
1294+
events.push("event-lock");
1295+
return await run();
1296+
},
1297+
});
1298+
installSessionExternalHookWriteLock({
1299+
session,
1300+
withSessionWriteLock: async (run) => {
1301+
events.push("hook-lock");
1302+
return await run();
1303+
},
1304+
});
1305+
1306+
session["_handleAgentEvent"]({ type: "tool_call" });
1307+
await new Promise<void>((resolve) => setTimeout(resolve, 5));
1308+
1309+
const beforeQueueRelease = await Promise.race([
1310+
spawnedHook.then(() => "done"),
1311+
new Promise<string>((resolve) => {
1312+
setTimeout(() => resolve("waiting"), 25);
1313+
}),
1314+
]);
1315+
1316+
expect(beforeQueueRelease).toBe("waiting");
1317+
expect(events).toEqual(["event-lock", "handle:tool_call", "process:tool_call", "process:end"]);
1318+
1319+
releaseQueue();
1320+
await spawnedHook;
1321+
1322+
expect(events).toEqual([
1323+
"event-lock",
1324+
"handle:tool_call",
1325+
"process:tool_call",
1326+
"process:end",
1327+
"queue-drained",
1328+
"hook-lock",
1329+
"hook",
1330+
]);
1331+
});
1332+
11921333
it("locks OpenClaw extension hooks that can mutate the session outside agent events", async () => {
11931334
const locked: string[] = [];
11941335
const called: string[] = [];

src/agents/embedded-agent-runner/run/attempt.session-lock.ts

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,32 @@ type LockableFunction = ((...args: unknown[]) => unknown) & {
7474
__openclawSessionWriteLockInstalled?: boolean;
7575
};
7676

77+
type SessionEventHookContext = {
78+
session: unknown;
79+
active: boolean;
80+
};
81+
82+
const sessionEventHookContext = new AsyncLocalStorage<SessionEventHookContext>();
83+
84+
function isProcessingAgentEventInCurrentChain(session: unknown): boolean {
85+
const context = sessionEventHookContext.getStore();
86+
return context?.active === true && context.session === session;
87+
}
88+
89+
async function withProcessingAgentEvent<T>(
90+
session: unknown,
91+
run: () => Promise<T> | T,
92+
): Promise<T> {
93+
const context: SessionEventHookContext = { session, active: true };
94+
return await sessionEventHookContext.run(context, async () => {
95+
try {
96+
return await run();
97+
} finally {
98+
context.active = false;
99+
}
100+
});
101+
}
102+
77103
function sessionHasExtensionHandlers(session: SessionEventProcessor, eventType: string): boolean {
78104
const extensionRunner = session["_extensionRunner"];
79105
const hasHandlers = extensionRunner?.hasHandlers;
@@ -662,6 +688,15 @@ async function waitForSessionEventQueue(session: unknown): Promise<void> {
662688
}
663689
}
664690

691+
async function waitForSessionEventQueueBeforeHook(session: unknown): Promise<void> {
692+
// Hook calls made by _handleAgentEvent are already inside the current queue
693+
// entry. Draining there waits on itself; detached/external hook work still drains.
694+
if (isProcessingAgentEventInCurrentChain(session)) {
695+
return;
696+
}
697+
await waitForSessionEventQueue(session);
698+
}
699+
665700
function installAwaitableSessionEventQueue(session: unknown): void {
666701
const owner = session as SessionEventQueueBridge;
667702
const original = owner["_handleAgentEvent"];
@@ -731,10 +766,14 @@ export function installSessionEventWriteLock(params: {
731766
event: unknown,
732767
signal?: unknown,
733768
) {
734-
if (!eventMayReachTranscriptWriters(session, event)) {
735-
return await original.call(this, event, signal);
736-
}
737-
return await params.withSessionWriteLock(async () => await original.call(this, event, signal));
769+
return await withProcessingAgentEvent(session, async () => {
770+
if (!eventMayReachTranscriptWriters(session, event)) {
771+
return await original.call(this, event, signal);
772+
}
773+
return await params.withSessionWriteLock(
774+
async () => await original.call(this, event, signal),
775+
);
776+
});
738777
};
739778
wrapped["__openclawSessionEventQueueAwaitInstalled"] =
740779
original["__openclawSessionEventQueueAwaitInstalled"];
@@ -756,28 +795,28 @@ export function installSessionExternalHookWriteLock(params: {
756795
owner: agent as Record<string, unknown>,
757796
key: "beforeToolCall",
758797
shouldLock: () => true,
759-
waitBeforeLock: () => waitForSessionEventQueue(session),
798+
waitBeforeLock: () => waitForSessionEventQueueBeforeHook(session),
760799
withSessionWriteLock: params.withSessionWriteLock,
761800
});
762801
installLockableFunction({
763802
owner: agent as Record<string, unknown>,
764803
key: "afterToolCall",
765804
shouldLock: () => sessionHasExtensionHandlers(session, "tool_result"),
766-
waitBeforeLock: () => waitForSessionEventQueue(session),
805+
waitBeforeLock: () => waitForSessionEventQueueBeforeHook(session),
767806
withSessionWriteLock: params.withSessionWriteLock,
768807
});
769808
installLockableFunction({
770809
owner: agent as Record<string, unknown>,
771810
key: "onPayload",
772811
shouldLock: () => sessionHasExtensionHandlers(session, "before_provider_request"),
773-
waitBeforeLock: () => waitForSessionEventQueue(session),
812+
waitBeforeLock: () => waitForSessionEventQueueBeforeHook(session),
774813
withSessionWriteLock: params.withSessionWriteLock,
775814
});
776815
installLockableFunction({
777816
owner: agent as Record<string, unknown>,
778817
key: "onResponse",
779818
shouldLock: () => sessionHasExtensionHandlers(session, "after_provider_response"),
780-
waitBeforeLock: () => waitForSessionEventQueue(session),
819+
waitBeforeLock: () => waitForSessionEventQueueBeforeHook(session),
781820
withSessionWriteLock: params.withSessionWriteLock,
782821
});
783822
}

0 commit comments

Comments
 (0)