Skip to content

Commit bb8aa0c

Browse files
samzongjalehman
andauthored
[Fix] Throttle agent event fanout (#80335)
Merged via squash. Prepared head SHA: 5dddb40 Co-authored-by: samzong <13782141+samzong@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman
1 parent 1fc92dd commit bb8aa0c

23 files changed

Lines changed: 845 additions & 45 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ Docs: https://docs.openclaw.ai
150150
- Plugins doctor: report stale plugin config warnings and avoid claiming full plugin health when config warnings remain. (#81515) Thanks @BKF-Gitty.
151151
- Sessions: display `model: "<agentId>-acp"` / `modelProvider: "acpx"` (ACP-runtime sentinel) for ACP control-plane sessions in `openclaw sessions` output, instead of the agent's configured model which was misleading. Catalog finding 20. (#79543)
152152
- Slack: normalize message read `before` and `after` timestamp bounds before calling Slack history or thread reply APIs. Fixes #80835. (#81338) Thanks @honor2030.
153+
- Gateway: throttle assistant/thinking agent event fanout during streaming bursts without dropping buffered deltas. (#80335) Thanks @samzong.
153154

154155
### Changes
155156

src/config/sessions/transcript.test.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import fs from "node:fs";
22
import { describe, expect, it, vi } from "vitest";
3+
import { repairToolUseResultPairing } from "../../agents/session-transcript-repair.js";
34
import * as transcriptEvents from "../../sessions/transcript-events.js";
45
import type { SessionTranscriptUpdate } from "../../sessions/transcript-events.js";
56
import { resolveSessionTranscriptPathInDir } from "./paths.js";
@@ -18,6 +19,7 @@ describe("appendAssistantMessageToSessionTranscript", () => {
1819
type ExactAssistantMessage = Parameters<
1920
typeof appendExactAssistantMessageToSessionTranscript
2021
>[0]["message"];
22+
type TranscriptRepairMessage = Parameters<typeof repairToolUseResultPairing>[0][number];
2123
type TranscriptUpdateEmitterSpy = {
2224
mock: {
2325
calls: [string | SessionTranscriptUpdate][];
@@ -283,6 +285,78 @@ describe("appendAssistantMessageToSessionTranscript", () => {
283285
}
284286
});
285287

288+
it("keeps delivery mirrors in transcripts while repair preserves real tool results", async () => {
289+
writeTranscriptStore();
290+
const sessionFile = resolveSessionTranscriptPathInDir(sessionId, fixture.sessionsDir());
291+
const toolCallId = "call_maniple_list";
292+
293+
const toolCallResult = await appendSessionTranscriptMessage({
294+
transcriptPath: sessionFile,
295+
message: {
296+
role: "assistant",
297+
content: [
298+
{
299+
type: "toolCall",
300+
id: toolCallId,
301+
name: "maniple__list_workers",
302+
arguments: {},
303+
},
304+
],
305+
stopReason: "toolUse",
306+
},
307+
});
308+
309+
const mirrorResult = await appendAssistantMessageToSessionTranscript({
310+
sessionKey,
311+
text: "Maniple List Workers",
312+
storePath: fixture.storePath(),
313+
});
314+
315+
expect(mirrorResult.ok).toBe(true);
316+
if (!mirrorResult.ok) {
317+
return;
318+
}
319+
expect(mirrorResult.messageId).not.toBe(toolCallResult.messageId);
320+
const linesAfterMirror = fs.readFileSync(sessionFile, "utf-8").trim().split("\n");
321+
expect(linesAfterMirror).toHaveLength(3);
322+
const mirrorLine = JSON.parse(linesAfterMirror[2]);
323+
expect(mirrorLine.message.model).toBe("delivery-mirror");
324+
325+
await appendSessionTranscriptMessage({
326+
transcriptPath: sessionFile,
327+
message: {
328+
role: "toolResult",
329+
toolCallId,
330+
toolName: "maniple__list_workers",
331+
content: [{ type: "text", text: "workers listed" }],
332+
isError: false,
333+
},
334+
});
335+
336+
const messages = fs
337+
.readFileSync(sessionFile, "utf-8")
338+
.trim()
339+
.split("\n")
340+
.map((line) => JSON.parse(line) as { message?: TranscriptRepairMessage })
341+
.flatMap((entry) => (entry.message ? [entry.message] : []));
342+
expect(messages.map((message) => message.role)).toEqual([
343+
"assistant",
344+
"assistant",
345+
"toolResult",
346+
]);
347+
const repair = repairToolUseResultPairing(messages, {
348+
missingToolResultText: "aborted",
349+
});
350+
351+
expect(repair.added).toHaveLength(0);
352+
expect(repair.messages.map((message) => message.role)).toEqual([
353+
"assistant",
354+
"toolResult",
355+
"assistant",
356+
]);
357+
expect((repair.messages[2] as { model?: string }).model).toBe("delivery-mirror");
358+
});
359+
286360
it("finds session entry using normalized (lowercased) key", async () => {
287361
const storeKey = "agent:main:imessage:direct:+15551234567";
288362
const store = {

src/config/sessions/transcript.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,6 @@ export async function appendExactAssistantMessageToSessionTranscript(params: {
302302
if (latestEquivalentAssistantId) {
303303
return { ok: true, sessionFile, messageId: latestEquivalentAssistantId };
304304
}
305-
306305
const message = {
307306
...params.message,
308307
...(explicitIdempotencyKey ? { idempotencyKey: explicitIdempotencyKey } : {}),

src/gateway/chat-abort.test.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,21 @@ function createOps(params: {
5454
chatDeltaSentAt: new Map([[runId, Date.now()]]),
5555
chatDeltaLastBroadcastLen: new Map([[runId, buffer?.length ?? 0]]),
5656
chatDeltaLastBroadcastText: new Map(buffer !== undefined ? [[runId, buffer]] : []),
57+
agentDeltaSentAt: new Map([[`${runId}:assistant`, Date.now()]]),
58+
bufferedAgentEvents: new Map([
59+
[
60+
`${runId}:assistant`,
61+
{
62+
payload: {
63+
runId,
64+
seq: 1,
65+
stream: "assistant",
66+
ts: Date.now(),
67+
data: { text: "buffer", delta: "buffer" },
68+
},
69+
},
70+
],
71+
]),
5772
chatAbortedRuns: new Map(),
5873
removeChatRun,
5974
agentRunSeq: new Map(),
@@ -110,6 +125,8 @@ describe("abortChatRunById", () => {
110125
expect(ops.chatDeltaSentAt.has(runId)).toBe(false);
111126
expect(ops.chatDeltaLastBroadcastLen.has(runId)).toBe(false);
112127
expect(ops.chatDeltaLastBroadcastText.has(runId)).toBe(false);
128+
expect(ops.agentDeltaSentAt?.has(`${runId}:assistant`)).toBe(false);
129+
expect(ops.bufferedAgentEvents?.has(`${runId}:assistant`)).toBe(false);
113130
expect(ops.removeChatRun).toHaveBeenCalledWith(runId, runId, sessionKey);
114131
expect(ops.agentRunSeq.has(runId)).toBe(false);
115132
expect(ops.agentRunSeq.has("client-run-1")).toBe(false);

src/gateway/chat-abort.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { isAbortRequestText } from "../auto-reply/reply/abort-primitives.js";
22
import { emitAgentEvent } from "../infra/agent-events.js";
3+
import type { BufferedAgentEvent } from "./server-chat-state.js";
34

45
const DEFAULT_CHAT_RUN_ABORT_GRACE_MS = 60_000;
56

@@ -113,6 +114,8 @@ export type ChatAbortOps = {
113114
chatDeltaSentAt: Map<string, number>;
114115
chatDeltaLastBroadcastLen: Map<string, number>;
115116
chatDeltaLastBroadcastText: Map<string, string>;
117+
agentDeltaSentAt: Map<string, number>;
118+
bufferedAgentEvents: Map<string, BufferedAgentEvent>;
116119
chatAbortedRuns: Map<string, number>;
117120
removeChatRun: (
118121
sessionId: string,
@@ -178,6 +181,12 @@ export function abortChatRunById(
178181
ops.chatDeltaSentAt.delete(runId);
179182
ops.chatDeltaLastBroadcastLen.delete(runId);
180183
ops.chatDeltaLastBroadcastText.delete(runId);
184+
ops.agentDeltaSentAt.delete(runId);
185+
ops.agentDeltaSentAt.delete(`${runId}:assistant`);
186+
ops.agentDeltaSentAt.delete(`${runId}:thinking`);
187+
ops.bufferedAgentEvents.delete(runId);
188+
ops.bufferedAgentEvents.delete(`${runId}:assistant`);
189+
ops.bufferedAgentEvents.delete(`${runId}:thinking`);
181190
const removed = ops.removeChatRun(runId, runId, sessionKey);
182191
broadcastChatAborted(ops, { runId, sessionKey, stopReason, partialText });
183192
emitAgentEvent({

src/gateway/server-chat-state.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1+
import type { AgentEventPayload } from "../infra/agent-events.js";
2+
13
export type ChatRunEntry = {
24
sessionKey: string;
35
clientRunId: string;
46
};
57

8+
export type BufferedAgentEvent = {
9+
sessionKey?: string;
10+
payload: AgentEventPayload & { spawnedBy?: string };
11+
};
12+
613
export type ChatRunRegistry = {
714
add: (sessionId: string, entry: ChatRunEntry) => void;
815
peek: (sessionId: string) => ChatRunEntry | undefined;
@@ -71,6 +78,8 @@ export type ChatRunState = {
7178
/** Length of text at the time of the last broadcast, used to avoid duplicate flushes. */
7279
deltaLastBroadcastLen: Map<string, number>;
7380
deltaLastBroadcastText: Map<string, string>;
81+
agentDeltaSentAt: Map<string, number>;
82+
bufferedAgentEvents: Map<string, BufferedAgentEvent>;
7483
abortedRuns: Map<string, number>;
7584
clear: () => void;
7685
};
@@ -82,6 +91,8 @@ export function createChatRunState(): ChatRunState {
8291
const deltaSentAt = new Map<string, number>();
8392
const deltaLastBroadcastLen = new Map<string, number>();
8493
const deltaLastBroadcastText = new Map<string, string>();
94+
const agentDeltaSentAt = new Map<string, number>();
95+
const bufferedAgentEvents = new Map<string, BufferedAgentEvent>();
8596
const abortedRuns = new Map<string, number>();
8697

8798
const clear = () => {
@@ -91,6 +102,8 @@ export function createChatRunState(): ChatRunState {
91102
deltaSentAt.clear();
92103
deltaLastBroadcastLen.clear();
93104
deltaLastBroadcastText.clear();
105+
agentDeltaSentAt.clear();
106+
bufferedAgentEvents.clear();
94107
abortedRuns.clear();
95108
};
96109

@@ -101,6 +114,8 @@ export function createChatRunState(): ChatRunState {
101114
deltaSentAt,
102115
deltaLastBroadcastLen,
103116
deltaLastBroadcastText,
117+
agentDeltaSentAt,
118+
bufferedAgentEvents,
104119
abortedRuns,
105120
clear,
106121
};

0 commit comments

Comments
 (0)