Skip to content

Commit 0e7250f

Browse files
authored
feat(diagnostics): emit model call events
Emit structured diagnostic events for embedded run and model-call lifecycle with trace context, duration, and safe error categories.
1 parent e5f55dd commit 0e7250f

8 files changed

Lines changed: 535 additions & 20 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
1414
- Diagnostics/OTEL: pass immutable per-run diagnostic trace context through agent and tool hook contexts, and parent exported diagnostic spans from validated context without retaining global trace state. Thanks @vincentkoc.
1515
- Diagnostics/OTEL: make exporter startup restart-safe so config reloads do not retain stale SDKs, log transports, or diagnostic event listeners. Thanks @vincentkoc.
1616
- Diagnostics: emit structured tool execution diagnostic events with trace context, timing, and redacted error metadata. Thanks @vincentkoc.
17+
- Diagnostics: emit structured run and model-call diagnostic events with trace context, duration, and non-message error metadata. Thanks @vincentkoc.
1718
- Control UI/chat: add a Steer action on queued messages so a browser follow-up can be injected into the active run without retyping it.
1819
- Control UI/Talk: add browser WebRTC realtime voice sessions backed by OpenAI Realtime, with Gateway-minted ephemeral client secrets and `openclaw_agent_consult` handoff to the full OpenClaw agent.
1920
- Agents/tools: add optional per-call `timeoutMs` support for image, video, music, and TTS generation tools so agents can extend provider request timeouts only when a specific generation needs it.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
8ca22ea6125fb198641c676d73b4df5a3bc49079be68bef8ed0718a54c1bb53a plugin-sdk-api-baseline.json
2-
197d9743128020062fc457228fa9139d0bd465d9e1775101bfc39137f4a10896 plugin-sdk-api-baseline.jsonl
1+
b125289f628c19afb6087dcd58b674fa8acc8899545f99db81c264c4c964d17f plugin-sdk-api-baseline.json
2+
2a2e9959cd35a375ec97682ec5d5108d94d4e77a82085929c58e9a994313d5e6 plugin-sdk-api-baseline.jsonl

extensions/diagnostics-otel/src/service.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,11 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
815815
case "tool.execution.started":
816816
case "tool.execution.completed":
817817
case "tool.execution.error":
818+
case "run.started":
819+
case "run.completed":
820+
case "model.call.started":
821+
case "model.call.completed":
822+
case "model.call.error":
818823
case "diagnostic.memory.sample":
819824
case "diagnostic.memory.pressure":
820825
case "payload.large":
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import type { StreamFn } from "@mariozechner/pi-agent-core";
2+
import { beforeEach, describe, expect, it } from "vitest";
3+
import {
4+
onDiagnosticEvent,
5+
resetDiagnosticEventsForTest,
6+
type DiagnosticEventPayload,
7+
} from "../../../infra/diagnostic-events.js";
8+
import { createDiagnosticTraceContext } from "../../../infra/diagnostic-trace-context.js";
9+
import { wrapStreamFnWithDiagnosticModelCallEvents } from "./attempt.model-diagnostic-events.js";
10+
11+
async function collectModelCallEvents(run: () => Promise<void>): Promise<DiagnosticEventPayload[]> {
12+
const events: DiagnosticEventPayload[] = [];
13+
const stop = onDiagnosticEvent((event) => {
14+
if (event.type.startsWith("model.call.")) {
15+
events.push(event);
16+
}
17+
});
18+
try {
19+
await run();
20+
return events;
21+
} finally {
22+
stop();
23+
}
24+
}
25+
26+
async function drain(stream: AsyncIterable<unknown>): Promise<void> {
27+
for await (const _ of stream) {
28+
// drain
29+
}
30+
}
31+
32+
describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
33+
beforeEach(() => {
34+
resetDiagnosticEventsForTest();
35+
});
36+
37+
it("emits started and completed events for async streams", async () => {
38+
async function* stream() {
39+
yield { type: "text", text: "ok" };
40+
}
41+
const originalStream = stream() as unknown as AsyncIterable<unknown> & {
42+
result: () => Promise<string>;
43+
};
44+
originalStream.result = async () => "kept";
45+
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
46+
(() => originalStream) as unknown as StreamFn,
47+
{
48+
runId: "run-1",
49+
sessionKey: "session-key",
50+
sessionId: "session-id",
51+
provider: "openai",
52+
model: "gpt-5.4",
53+
api: "openai-responses",
54+
transport: "http",
55+
trace: createDiagnosticTraceContext({
56+
traceId: "4bf92f3577b34da6a3ce929d0e0e4736",
57+
spanId: "00f067aa0ba902b7",
58+
}),
59+
nextCallId: () => "call-1",
60+
},
61+
);
62+
63+
const events = await collectModelCallEvents(async () => {
64+
const returned = wrapped(
65+
{} as never,
66+
{} as never,
67+
{} as never,
68+
) as unknown as typeof originalStream;
69+
expect(returned).toBe(originalStream);
70+
expect(await returned.result()).toBe("kept");
71+
await drain(returned);
72+
});
73+
74+
expect(events.map((event) => event.type)).toEqual([
75+
"model.call.started",
76+
"model.call.completed",
77+
]);
78+
expect(events[0]).toMatchObject({
79+
type: "model.call.started",
80+
runId: "run-1",
81+
callId: "call-1",
82+
sessionKey: "session-key",
83+
sessionId: "session-id",
84+
provider: "openai",
85+
model: "gpt-5.4",
86+
api: "openai-responses",
87+
transport: "http",
88+
});
89+
expect(events[0]?.trace?.parentSpanId).toBe("00f067aa0ba902b7");
90+
expect(events[1]).toMatchObject({
91+
type: "model.call.completed",
92+
callId: "call-1",
93+
durationMs: expect.any(Number),
94+
});
95+
});
96+
97+
it("emits error events when stream iteration fails", async () => {
98+
const stream = {
99+
[Symbol.asyncIterator]() {
100+
return {
101+
async next(): Promise<IteratorResult<unknown>> {
102+
throw new TypeError("provider failed");
103+
},
104+
};
105+
},
106+
};
107+
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
108+
(() => stream) as unknown as StreamFn,
109+
{
110+
runId: "run-1",
111+
provider: "anthropic",
112+
model: "sonnet-4.6",
113+
trace: createDiagnosticTraceContext(),
114+
nextCallId: () => "call-err",
115+
},
116+
);
117+
118+
const events = await collectModelCallEvents(async () => {
119+
await expect(
120+
drain(wrapped({} as never, {} as never, {} as never) as AsyncIterable<unknown>),
121+
).rejects.toThrow("provider failed");
122+
});
123+
124+
expect(events.map((event) => event.type)).toEqual(["model.call.started", "model.call.error"]);
125+
expect(events[1]).toMatchObject({
126+
type: "model.call.error",
127+
callId: "call-err",
128+
errorCategory: "TypeError",
129+
durationMs: expect.any(Number),
130+
});
131+
});
132+
133+
it("emits error events when stream consumption stops early", async () => {
134+
async function* stream() {
135+
yield { type: "text", text: "first" };
136+
yield { type: "text", text: "second" };
137+
}
138+
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
139+
(() => stream()) as unknown as StreamFn,
140+
{
141+
runId: "run-1",
142+
provider: "openai",
143+
model: "gpt-5.4",
144+
trace: createDiagnosticTraceContext(),
145+
nextCallId: () => "call-abandoned",
146+
},
147+
);
148+
149+
const events = await collectModelCallEvents(async () => {
150+
for await (const _ of wrapped(
151+
{} as never,
152+
{} as never,
153+
{} as never,
154+
) as AsyncIterable<unknown>) {
155+
break;
156+
}
157+
});
158+
159+
expect(events.map((event) => event.type)).toEqual(["model.call.started", "model.call.error"]);
160+
expect(events[1]).toMatchObject({
161+
type: "model.call.error",
162+
callId: "call-abandoned",
163+
errorCategory: "StreamAbandoned",
164+
durationMs: expect.any(Number),
165+
});
166+
});
167+
});
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
import type { StreamFn } from "@mariozechner/pi-agent-core";
2+
import {
3+
emitDiagnosticEvent,
4+
type DiagnosticEventInput,
5+
} from "../../../infra/diagnostic-events.js";
6+
import {
7+
createChildDiagnosticTraceContext,
8+
freezeDiagnosticTraceContext,
9+
type DiagnosticTraceContext,
10+
} from "../../../infra/diagnostic-trace-context.js";
11+
12+
type ModelCallDiagnosticContext = {
13+
runId: string;
14+
sessionKey?: string;
15+
sessionId?: string;
16+
provider: string;
17+
model: string;
18+
api?: string;
19+
transport?: string;
20+
trace: DiagnosticTraceContext;
21+
nextCallId: () => string;
22+
};
23+
24+
type ModelCallEventBase = Omit<
25+
Extract<DiagnosticEventInput, { type: "model.call.started" }>,
26+
"type"
27+
>;
28+
29+
export function diagnosticErrorCategory(err: unknown): string {
30+
if (err instanceof Error && err.name.trim()) {
31+
return err.name;
32+
}
33+
return typeof err;
34+
}
35+
36+
function isPromiseLike(value: unknown): value is PromiseLike<unknown> {
37+
return (
38+
value !== null &&
39+
(typeof value === "object" || typeof value === "function") &&
40+
typeof (value as { then?: unknown }).then === "function"
41+
);
42+
}
43+
44+
function isAsyncIterable(value: unknown): value is AsyncIterable<unknown> {
45+
return (
46+
value !== null &&
47+
typeof value === "object" &&
48+
typeof (value as { [Symbol.asyncIterator]?: unknown })[Symbol.asyncIterator] === "function"
49+
);
50+
}
51+
52+
function baseModelCallEvent(
53+
ctx: ModelCallDiagnosticContext,
54+
callId: string,
55+
trace: DiagnosticTraceContext,
56+
): ModelCallEventBase {
57+
return {
58+
runId: ctx.runId,
59+
callId,
60+
...(ctx.sessionKey && { sessionKey: ctx.sessionKey }),
61+
...(ctx.sessionId && { sessionId: ctx.sessionId }),
62+
provider: ctx.provider,
63+
model: ctx.model,
64+
...(ctx.api && { api: ctx.api }),
65+
...(ctx.transport && { transport: ctx.transport }),
66+
trace,
67+
};
68+
}
69+
70+
async function* observeModelCallIterator<T>(
71+
iterator: AsyncIterator<T>,
72+
eventBase: ModelCallEventBase,
73+
startedAt: number,
74+
): AsyncIterable<T> {
75+
let terminalEmitted = false;
76+
try {
77+
for (;;) {
78+
const next = await iterator.next();
79+
if (next.done) {
80+
break;
81+
}
82+
yield next.value;
83+
}
84+
terminalEmitted = true;
85+
emitDiagnosticEvent({
86+
type: "model.call.completed",
87+
...eventBase,
88+
durationMs: Date.now() - startedAt,
89+
});
90+
} catch (err) {
91+
terminalEmitted = true;
92+
emitDiagnosticEvent({
93+
type: "model.call.error",
94+
...eventBase,
95+
durationMs: Date.now() - startedAt,
96+
errorCategory: diagnosticErrorCategory(err),
97+
});
98+
throw err;
99+
} finally {
100+
if (!terminalEmitted) {
101+
await iterator.return?.();
102+
emitDiagnosticEvent({
103+
type: "model.call.error",
104+
...eventBase,
105+
durationMs: Date.now() - startedAt,
106+
errorCategory: "StreamAbandoned",
107+
});
108+
}
109+
}
110+
}
111+
112+
function observeModelCallStream<T extends AsyncIterable<unknown>>(
113+
stream: T,
114+
eventBase: ModelCallEventBase,
115+
startedAt: number,
116+
): T {
117+
const createIterator = stream[Symbol.asyncIterator].bind(stream);
118+
Object.defineProperty(stream, Symbol.asyncIterator, {
119+
configurable: true,
120+
value: () =>
121+
observeModelCallIterator(createIterator(), eventBase, startedAt)[Symbol.asyncIterator](),
122+
});
123+
return stream;
124+
}
125+
126+
function observeModelCallResult(
127+
result: unknown,
128+
eventBase: ModelCallEventBase,
129+
startedAt: number,
130+
): unknown {
131+
if (isAsyncIterable(result)) {
132+
return observeModelCallStream(result, eventBase, startedAt);
133+
}
134+
emitDiagnosticEvent({
135+
type: "model.call.completed",
136+
...eventBase,
137+
durationMs: Date.now() - startedAt,
138+
});
139+
return result;
140+
}
141+
142+
export function wrapStreamFnWithDiagnosticModelCallEvents(
143+
streamFn: StreamFn,
144+
ctx: ModelCallDiagnosticContext,
145+
): StreamFn {
146+
return ((model, streamContext, options) => {
147+
const callId = ctx.nextCallId();
148+
const trace = freezeDiagnosticTraceContext(createChildDiagnosticTraceContext(ctx.trace));
149+
const eventBase = baseModelCallEvent(ctx, callId, trace);
150+
emitDiagnosticEvent({
151+
type: "model.call.started",
152+
...eventBase,
153+
});
154+
const startedAt = Date.now();
155+
156+
try {
157+
const result = streamFn(model, streamContext, options);
158+
if (isPromiseLike(result)) {
159+
return result.then(
160+
(resolved) => observeModelCallResult(resolved, eventBase, startedAt),
161+
(err) => {
162+
emitDiagnosticEvent({
163+
type: "model.call.error",
164+
...eventBase,
165+
durationMs: Date.now() - startedAt,
166+
errorCategory: diagnosticErrorCategory(err),
167+
});
168+
throw err;
169+
},
170+
);
171+
}
172+
return observeModelCallResult(result, eventBase, startedAt);
173+
} catch (err) {
174+
emitDiagnosticEvent({
175+
type: "model.call.error",
176+
...eventBase,
177+
durationMs: Date.now() - startedAt,
178+
errorCategory: diagnosticErrorCategory(err),
179+
});
180+
throw err;
181+
}
182+
}) as StreamFn;
183+
}

0 commit comments

Comments
 (0)