Skip to content

Commit 135fdf3

Browse files
Ivan SomovIvan Somov
authored andcommitted
fix: add mutex to session-tool-result-guard to prevent async race condition
1 parent 7b344b8 commit 135fdf3

2 files changed

Lines changed: 153 additions & 5 deletions

File tree

src/agents/session-tool-result-guard.test.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,3 +519,117 @@ describe("installSessionToolResultGuard", () => {
519519
expect(syntheticForError).toHaveLength(0);
520520
});
521521
});
522+
523+
describe("concurrent append race condition", () => {
524+
it("does not synthesize when toolResult arrives concurrently with next assistant message", async () => {
525+
// Simulate the exact race: tool_calls appended, then toolResult and a
526+
// completion message are dispatched concurrently (both microtasks queued).
527+
const sm = SessionManager.inMemory();
528+
installSessionToolResultGuard(sm);
529+
530+
sm.appendMessage(
531+
asAppendMessage({
532+
role: "assistant",
533+
content: [{ type: "toolCall", id: "race_1", name: "read", arguments: {} }],
534+
}),
535+
);
536+
537+
// Fire both concurrently — do NOT await between them
538+
const p1 = Promise.resolve().then(() =>
539+
sm.appendMessage(
540+
asAppendMessage({
541+
role: "toolResult",
542+
toolCallId: "race_1",
543+
content: [{ type: "text", text: "real result" }],
544+
isError: false,
545+
}),
546+
),
547+
);
548+
const p2 = Promise.resolve().then(() =>
549+
sm.appendMessage(
550+
asAppendMessage({
551+
role: "assistant",
552+
content: [{ type: "text", text: "completion" }],
553+
stopReason: "endTurn",
554+
}),
555+
),
556+
);
557+
558+
await Promise.all([p1, p2]);
559+
560+
const messages = expectPersistedRoles(sm, ["assistant", "toolResult", "assistant"]);
561+
// The toolResult must be the real one, not synthetic
562+
const tr = messages[1] as { isError?: boolean; content?: Array<{ text?: string }> };
563+
expect(tr.isError).toBe(false);
564+
expect(tr.content?.[0]?.text).toBe("real result");
565+
});
566+
567+
it("serializes multiple concurrent appends without duplicating messages", async () => {
568+
const sm = SessionManager.inMemory();
569+
installSessionToolResultGuard(sm);
570+
571+
sm.appendMessage(
572+
asAppendMessage({
573+
role: "assistant",
574+
content: [
575+
{ type: "toolCall", id: "c1", name: "a", arguments: {} },
576+
{ type: "toolCall", id: "c2", name: "b", arguments: {} },
577+
],
578+
}),
579+
);
580+
581+
// Fire all results concurrently
582+
await Promise.all([
583+
Promise.resolve().then(() =>
584+
sm.appendMessage(
585+
asAppendMessage({
586+
role: "toolResult",
587+
toolCallId: "c1",
588+
content: [{ type: "text", text: "r1" }],
589+
isError: false,
590+
}),
591+
),
592+
),
593+
Promise.resolve().then(() =>
594+
sm.appendMessage(
595+
asAppendMessage({
596+
role: "toolResult",
597+
toolCallId: "c2",
598+
content: [{ type: "text", text: "r2" }],
599+
isError: false,
600+
}),
601+
),
602+
),
603+
]);
604+
605+
const messages = expectPersistedRoles(sm, ["assistant", "toolResult", "toolResult"]);
606+
// Both results must be the real ones, not synthetic
607+
const results = messages.slice(1) as Array<{ isError?: boolean }>;
608+
expect(results.every((r) => r.isError === false)).toBe(true);
609+
});
610+
611+
it("getPendingIds() returns empty after concurrent resolution", async () => {
612+
const sm = SessionManager.inMemory();
613+
const guard = installSessionToolResultGuard(sm);
614+
615+
sm.appendMessage(
616+
asAppendMessage({
617+
role: "assistant",
618+
content: [{ type: "toolCall", id: "drain_1", name: "x", arguments: {} }],
619+
}),
620+
);
621+
622+
await Promise.resolve().then(() =>
623+
sm.appendMessage(
624+
asAppendMessage({
625+
role: "toolResult",
626+
toolCallId: "drain_1",
627+
content: [{ type: "text", text: "ok" }],
628+
isError: false,
629+
}),
630+
),
631+
);
632+
633+
expect(guard.getPendingIds()).toEqual([]);
634+
});
635+
});

src/agents/session-tool-result-guard.ts

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,30 @@ type SessionManagerWithRawAppend = SessionManager & {
2222
[RAW_APPEND_MESSAGE]?: SessionManager["appendMessage"];
2323
};
2424

25+
/**
26+
* Creates a simple serializing async mutex.
27+
* Each call to `run` queues `fn` behind all previously-queued work so that
28+
* concurrent callers execute sequentially in arrival order.
29+
*
30+
* This is intentionally minimal — no timeout, no fairness guarantees beyond
31+
* FIFO promise resolution. Sufficient for low-throughput session appends.
32+
*/
33+
function createMutex() {
34+
let queue: Promise<void> = Promise.resolve();
35+
return {
36+
run<T>(fn: () => T | Promise<T>): Promise<T> {
37+
const result = queue.then(() => fn());
38+
// Advance the queue regardless of whether fn() throws, so a failed
39+
// append does not permanently block subsequent calls.
40+
queue = result.then(
41+
() => undefined,
42+
() => undefined,
43+
);
44+
return result;
45+
},
46+
};
47+
}
48+
2549
/**
2650
* Truncate oversized text content blocks in a tool result message.
2751
* Returns the original message if under the limit, or a new message with
@@ -127,6 +151,8 @@ export function installSessionToolResultGuard(
127151
const originalAppend = getRawSessionAppendMessage(sessionManager);
128152
(sessionManager as SessionManagerWithRawAppend)[RAW_APPEND_MESSAGE] = originalAppend;
129153
const pendingState = createPendingToolCallState();
154+
const appendMutex = createMutex();
155+
130156
const persistMessage = (message: AgentMessage) => {
131157
const transformer = opts?.transformMessageForPersistence;
132158
return transformer ? transformer(message) : message;
@@ -187,7 +213,7 @@ export function installSessionToolResultGuard(
187213
pendingState.clear();
188214
};
189215

190-
const guardedAppend = (message: AgentMessage) => {
216+
const guardedAppendInner = (message: AgentMessage) => {
191217
let nextMessage = message;
192218
const role = (message as { role?: unknown }).role;
193219
if (role === "assistant") {
@@ -253,6 +279,12 @@ export function installSessionToolResultGuard(
253279
flushPendingToolResults();
254280
}
255281

282+
// Register tool calls BEFORE appending so that a racing toolResult handler
283+
// (which may arrive during an awaited originalAppend) finds the IDs in pending.
284+
if (toolCalls.length > 0) {
285+
pendingState.trackToolCalls(toolCalls);
286+
}
287+
256288
const finalMessage = applyBeforeWriteHook(persistMessage(nextMessage));
257289
if (!finalMessage) {
258290
return undefined;
@@ -271,13 +303,15 @@ export function installSessionToolResultGuard(
271303
});
272304
}
273305

274-
if (toolCalls.length > 0) {
275-
pendingState.trackToolCalls(toolCalls);
276-
}
277-
278306
return result;
279307
};
280308

309+
const guardedAppend = (message: AgentMessage): ReturnType<typeof originalAppend> => {
310+
return appendMutex.run(() => guardedAppendInner(message)) as unknown as ReturnType<
311+
typeof originalAppend
312+
>;
313+
};
314+
281315
// Monkey-patch appendMessage with our guarded version.
282316
sessionManager.appendMessage = guardedAppend as SessionManager["appendMessage"];
283317

0 commit comments

Comments
 (0)