Skip to content

Commit 5d799c2

Browse files
fix: yield diagnostic event drains (#82937)
Summary: - The branch caps async diagnostic drains at 100 events per turn, adds pending/full-drain diagnostic helpers, ... rminal diagnostics to inspect pending events, and adds regression coverage plus changelog/baseline updates. - Reproducibility: yes. from source inspection. Current main drains the entire async diagnostic queue in one s ... ck, and the PR body supplies a focused 250-event after-fix probe showing 100/200/250 delivery across turns. Automerge notes: - PR branch already contained follow-up commit before automerge: fix: yield diagnostic event drains Validation: - ClawSweeper review passed for head 9561093. - Required merge gates passed before the squash merge. Prepared head SHA: 9561093 Review: #82937 (comment) Co-authored-by: Galin Iliev <galini@microsoft.com> Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com>
1 parent 125f0c3 commit 5d799c2

7 files changed

Lines changed: 332 additions & 13 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
1313

1414
- Agents/code mode: spell out the `exec` tool's JavaScript/TypeScript, no Node module, and catalog-bridge constraints in model-visible schema text so agents can use enabled tools without trial-and-error. (#84269) Thanks @Kaspre.
1515
- Codex: give `image_generate` dynamic-tool calls a 120s default watchdog when no per-call or configured image timeout is set, so image generation no longer falls back to the generic 30s bridge timeout. (#84254) Thanks @moritzmmayerhofer.
16+
- Codex: avoid duplicate dynamic tool terminal diagnostics while large diagnostic backlogs drain without blocking tool responses. (#82937) Thanks @galiniliev.
1617
- CLI/message: include a stable top-level `messageId` in `openclaw message --json` output when channel sends return one. (#84191) Thanks @100menotu001.
1718
- Gateway/agents: use an agent's `identity.name` in Gateway agent summaries when `agents.list[].name` is unset, so configured agent labels remain visible in clients. (#84355; refs #57835) Thanks @luoyanglang.
1819
- Plugins/hooks: apply a default 30-second timeout to `before_compaction` and `after_compaction` hooks so a hung plugin handler no longer blocks compaction completion. (#84153)
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
474c461709084ddd4014112d891c64abf3e062a417dbffae82be1cf54206283b plugin-sdk-api-baseline.json
2-
cb7ad8a96c541d1ed7295c4bde6fb6a679e5d3481ed66778610ef897a3152484 plugin-sdk-api-baseline.jsonl
1+
6468950bae79f48709683957c5b140f634425f02f292bc5981e12c6565044b48 plugin-sdk-api-baseline.json
2+
2b329a3747a80498d1bc974a64d56a637bde6d2e6f7415f82cdcaeebb8f703af plugin-sdk-api-baseline.jsonl

extensions/codex/src/app-server/run-attempt.test.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ import {
1414
type EmbeddedRunAttemptParams,
1515
} from "openclaw/plugin-sdk/agent-harness-runtime";
1616
import {
17+
emitDiagnosticEvent,
1718
emitTrustedDiagnosticEvent,
1819
onInternalDiagnosticEvent,
1920
resetDiagnosticEventsForTest,
21+
waitForDiagnosticEventsDrained,
2022
type DiagnosticEventPayload,
2123
} from "openclaw/plugin-sdk/diagnostic-runtime";
2224
import {
@@ -80,7 +82,19 @@ type RunCodexAppServerAttemptOptions = NonNullable<
8082
>;
8183

8284
function flushDiagnosticEvents() {
83-
return new Promise<void>((resolve) => setImmediate(resolve));
85+
return waitForDiagnosticEventsDrained();
86+
}
87+
88+
function emitAsyncDiagnosticBacklog(count: number): void {
89+
for (let index = 0; index < count; index += 1) {
90+
emitDiagnosticEvent({
91+
type: "model.call.started",
92+
runId: `backlog-run-${index}`,
93+
callId: `backlog-call-${index}`,
94+
provider: "openai",
95+
model: "gpt-5.4",
96+
});
97+
}
8498
}
8599

86100
function activeDiagnosticToolKeys(events: DiagnosticEventPayload[]): Set<string> {
@@ -2521,6 +2535,7 @@ describe("runCodexAppServerAttempt", () => {
25212535

25222536
const run = runCodexAppServerAttempt(params);
25232537
await harness.waitForMethod("thread/start");
2538+
emitAsyncDiagnosticBacklog(150);
25242539

25252540
const toolResult = (await harness.handleServerRequest({
25262541
id: "request-echo-error-tool",

extensions/codex/src/app-server/run-attempt.ts

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import {
5151
import { markAuthProfileBlockedUntil, resolveAgentDir } from "openclaw/plugin-sdk/agent-runtime";
5252
import {
5353
emitTrustedDiagnosticEvent,
54+
hasPendingInternalDiagnosticEvent,
5455
onInternalDiagnosticEvent,
5556
type DiagnosticEventPayload,
5657
} from "openclaw/plugin-sdk/diagnostic-runtime";
@@ -2183,8 +2184,15 @@ export async function runCodexAppServerAttempt(
21832184
},
21842185
});
21852186
}
2186-
await waitForDiagnosticEventDrain();
2187-
if (!terminalDiagnosticObserved) {
2187+
if (
2188+
!terminalDiagnosticObserved &&
2189+
!hasPendingDynamicToolTerminalDiagnostic({
2190+
call,
2191+
runId: params.runId,
2192+
sessionId: params.sessionId,
2193+
sessionKey: params.sessionKey,
2194+
})
2195+
) {
21882196
emitDynamicToolTerminalDiagnostic({
21892197
response,
21902198
call,
@@ -2196,8 +2204,15 @@ export async function runCodexAppServerAttempt(
21962204
}
21972205
return protocolResponse as JsonValue;
21982206
} catch (error) {
2199-
await waitForDiagnosticEventDrain();
2200-
if (!terminalDiagnosticObserved) {
2207+
if (
2208+
!terminalDiagnosticObserved &&
2209+
!hasPendingDynamicToolTerminalDiagnostic({
2210+
call,
2211+
runId: params.runId,
2212+
sessionId: params.sessionId,
2213+
sessionKey: params.sessionKey,
2214+
})
2215+
) {
22012216
emitDynamicToolErrorDiagnostic({
22022217
call,
22032218
runId: params.runId,
@@ -2948,10 +2963,6 @@ function toCodexDynamicToolProtocolResponse(
29482963
};
29492964
}
29502965

2951-
function waitForDiagnosticEventDrain(): Promise<void> {
2952-
return new Promise((resolve) => setImmediate(resolve));
2953-
}
2954-
29552966
type TerminalToolExecutionDiagnostic = Extract<
29562967
DiagnosticEventPayload,
29572968
{ type: "tool.execution.blocked" | "tool.execution.completed" | "tool.execution.error" }
@@ -2996,6 +3007,26 @@ function isMatchingDynamicToolTerminalDiagnostic(params: {
29963007
);
29973008
}
29983009

3010+
function hasPendingDynamicToolTerminalDiagnostic(params: {
3011+
call: CodexDynamicToolCallParams;
3012+
runId?: string;
3013+
sessionId?: string;
3014+
sessionKey?: string;
3015+
}): boolean {
3016+
return hasPendingInternalDiagnosticEvent((event) => {
3017+
if (!isDynamicToolTerminalDiagnosticEvent(event)) {
3018+
return false;
3019+
}
3020+
return isMatchingDynamicToolTerminalDiagnostic({
3021+
event,
3022+
call: params.call,
3023+
runId: params.runId,
3024+
sessionId: params.sessionId,
3025+
sessionKey: params.sessionKey,
3026+
});
3027+
});
3028+
}
3029+
29993030
function resolveDynamicToolCallTimeoutMs(params: {
30003031
call: CodexDynamicToolCallParams;
30013032
config: EmbeddedRunAttemptParams["config"];

src/infra/diagnostic-events.test.ts

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ import {
33
emitDiagnosticEvent,
44
emitTrustedDiagnosticEvent,
55
formatDiagnosticTraceparentForPropagation,
6+
hasPendingInternalDiagnosticEvent,
67
isDiagnosticsEnabled,
78
onInternalDiagnosticEvent,
89
onDiagnosticEvent,
910
resetDiagnosticEventsForTest,
1011
setDiagnosticsEnabledForProcess,
12+
waitForDiagnosticEventsDrained,
13+
type DiagnosticEventPayload,
1114
} from "./diagnostic-events.js";
1215
import {
1316
createDiagnosticTraceContext,
@@ -415,6 +418,224 @@ describe("diagnostic-events", () => {
415418
expect(events).toEqual(["tool.execution.started", "model.call.started"]);
416419
});
417420

421+
it("yields between large high-frequency diagnostic event bursts", async () => {
422+
const events: string[] = [];
423+
onDiagnosticEvent((event) => {
424+
events.push(event.type);
425+
});
426+
427+
for (let index = 0; index < 250; index += 1) {
428+
emitDiagnosticEvent({
429+
type: "model.call.started",
430+
runId: `run-${index}`,
431+
callId: `call-${index}`,
432+
provider: "openai",
433+
model: "gpt-5.4",
434+
});
435+
}
436+
437+
expect(events).toStrictEqual([]);
438+
await new Promise<void>((resolve) => setImmediate(resolve));
439+
expect(events).toHaveLength(100);
440+
await new Promise<void>((resolve) => setImmediate(resolve));
441+
expect(events).toHaveLength(200);
442+
await new Promise<void>((resolve) => setImmediate(resolve));
443+
expect(events).toHaveLength(250);
444+
});
445+
446+
it("waits for all queued high-frequency diagnostic events to drain", async () => {
447+
const events: string[] = [];
448+
onDiagnosticEvent((event) => {
449+
events.push(event.type);
450+
});
451+
452+
for (let index = 0; index < 250; index += 1) {
453+
emitDiagnosticEvent({
454+
type: "model.call.started",
455+
runId: `run-${index}`,
456+
callId: `call-${index}`,
457+
provider: "openai",
458+
model: "gpt-5.4",
459+
});
460+
}
461+
462+
await waitForDiagnosticEventsDrained();
463+
464+
expect(events).toHaveLength(250);
465+
});
466+
467+
it("reports pending async diagnostic events before they drain", async () => {
468+
emitTrustedDiagnosticEvent({
469+
type: "tool.execution.error",
470+
runId: "run-pending",
471+
toolName: "exec",
472+
toolCallId: "call-pending",
473+
durationMs: 1,
474+
errorCategory: "test",
475+
});
476+
477+
expect(
478+
hasPendingInternalDiagnosticEvent(
479+
(event, metadata) =>
480+
metadata.trusted &&
481+
event.type === "tool.execution.error" &&
482+
event.toolCallId === "call-pending",
483+
),
484+
).toBe(true);
485+
486+
await waitForDiagnosticEventsDrained();
487+
488+
expect(
489+
hasPendingInternalDiagnosticEvent((event) => event.type === "tool.execution.error"),
490+
).toBe(false);
491+
});
492+
493+
it("passes immutable pending diagnostic copies to queue inspectors", async () => {
494+
const events: DiagnosticEventPayload[] = [];
495+
onInternalDiagnosticEvent((event) => {
496+
events.push(event);
497+
});
498+
499+
emitTrustedDiagnosticEvent({
500+
type: "tool.execution.error",
501+
runId: "run-immutable",
502+
toolName: "exec",
503+
toolCallId: "call-immutable",
504+
durationMs: 1,
505+
errorCategory: "test",
506+
});
507+
508+
let mutationErrors = 0;
509+
expect(
510+
hasPendingInternalDiagnosticEvent((event, metadata) => {
511+
try {
512+
(event as { type: string }).type = "model.usage";
513+
} catch {
514+
mutationErrors += 1;
515+
}
516+
try {
517+
(metadata as { trusted: boolean }).trusted = false;
518+
} catch {
519+
mutationErrors += 1;
520+
}
521+
return (
522+
metadata.trusted &&
523+
event.type === "tool.execution.error" &&
524+
event.toolCallId === "call-immutable"
525+
);
526+
}),
527+
).toBe(true);
528+
expect(mutationErrors).toBe(2);
529+
530+
await waitForDiagnosticEventsDrained();
531+
532+
expect(events).toMatchObject([
533+
{
534+
type: "tool.execution.error",
535+
toolCallId: "call-immutable",
536+
},
537+
]);
538+
});
539+
540+
it("skips uncloneable pending diagnostics during queue inspection", async () => {
541+
emitDiagnosticEvent({
542+
type: "model.call.started",
543+
runId: "run-uncloneable",
544+
callId: "call-uncloneable",
545+
provider: "openai",
546+
model: "gpt-5.4",
547+
badValue: () => undefined,
548+
} as never);
549+
emitTrustedDiagnosticEvent({
550+
type: "tool.execution.error",
551+
runId: "run-cloneable",
552+
toolName: "exec",
553+
toolCallId: "call-cloneable",
554+
durationMs: 1,
555+
errorCategory: "test",
556+
});
557+
558+
expect(
559+
hasPendingInternalDiagnosticEvent(
560+
(event, metadata) =>
561+
metadata.trusted &&
562+
event.type === "tool.execution.error" &&
563+
event.toolCallId === "call-cloneable",
564+
),
565+
).toBe(true);
566+
});
567+
568+
it("preserves trusted terminal tool diagnostics when the async queue is full", async () => {
569+
const events: DiagnosticEventPayload[] = [];
570+
onInternalDiagnosticEvent((event) => {
571+
events.push(event);
572+
});
573+
574+
emitTrustedDiagnosticEvent({
575+
type: "tool.execution.completed",
576+
runId: "run-saturation-first",
577+
toolName: "exec",
578+
toolCallId: "call-saturation-first",
579+
durationMs: 1,
580+
});
581+
582+
for (let index = 0; index < 9_999; index += 1) {
583+
emitDiagnosticEvent({
584+
type: "model.call.started",
585+
runId: `saturation-run-${index}`,
586+
callId: `saturation-call-${index}`,
587+
provider: "openai",
588+
model: "gpt-5.4",
589+
});
590+
}
591+
592+
emitTrustedDiagnosticEvent({
593+
type: "tool.execution.error",
594+
runId: "run-saturation-second",
595+
toolName: "exec",
596+
toolCallId: "call-saturation-second",
597+
durationMs: 1,
598+
errorCategory: "test",
599+
});
600+
601+
expect(
602+
hasPendingInternalDiagnosticEvent(
603+
(event, metadata) =>
604+
metadata.trusted &&
605+
event.type === "tool.execution.error" &&
606+
event.toolCallId === "call-saturation-second",
607+
),
608+
).toBe(true);
609+
610+
await waitForDiagnosticEventsDrained();
611+
612+
expect(
613+
events
614+
.filter(
615+
(
616+
event,
617+
): event is Extract<
618+
DiagnosticEventPayload,
619+
{ type: "tool.execution.completed" | "tool.execution.error" }
620+
> => event.type === "tool.execution.completed" || event.type === "tool.execution.error",
621+
)
622+
.map((event) => ({
623+
type: event.type,
624+
toolCallId: event.toolCallId,
625+
})),
626+
).toEqual([
627+
{
628+
type: "tool.execution.completed",
629+
toolCallId: "call-saturation-first",
630+
},
631+
{
632+
type: "tool.execution.error",
633+
toolCallId: "call-saturation-second",
634+
},
635+
]);
636+
expect(events.filter((event) => event.type === "model.call.started")).toHaveLength(9_998);
637+
});
638+
418639
it("keeps log records off the public diagnostic event stream", async () => {
419640
const publicEvents: string[] = [];
420641
const internalEvents: string[] = [];

0 commit comments

Comments
 (0)