Skip to content

Commit 23e9bc8

Browse files
fix(diagnostics): track model stream progress (#86757)
Summary: - The PR updates diagnostics to mark streamed model chunks as run progress, keeps silent model calls abortable after the stuck-session timeout, and adds regression coverage for stream progress and recovery behavior. - PR surface: Source +54, Tests +229. Total +283 across 6 files. - Reproducibility: yes. at source level: current main tracks model-call start/end activity but streamed chunks ... covery keys on stale lastProgressAgeMs. I did not run a live local-provider repro in this read-only review. Automerge notes: - PR branch already contained follow-up commit before automerge: fix(diagnostics): track model stream progress - PR branch already contained follow-up commit before automerge: test(diagnostics): cover silent local model aborts - PR branch already contained follow-up commit before automerge: fix(diagnostics): skip stream progress when disabled Validation: - ClawSweeper review passed for head fcc74d9. - Required merge gates passed before the squash merge. Prepared head SHA: fcc74d9 Review: #86757 (comment) Co-authored-by: Onur Solmaz <2453968+osolmaz@users.noreply.github.com> Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com> Approved-by: osolmaz Co-authored-by: osolmaz <2453968+osolmaz@users.noreply.github.com>
1 parent 711e963 commit 23e9bc8

6 files changed

Lines changed: 329 additions & 46 deletions

File tree

src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@ import {
44
onInternalDiagnosticEvent,
55
onTrustedInternalDiagnosticEvent,
66
resetDiagnosticEventsForTest,
7+
setDiagnosticsEnabledForProcess,
78
type DiagnosticEventPrivateData,
89
type DiagnosticEventPayload,
10+
waitForDiagnosticEventsDrained,
911
} from "../../../infra/diagnostic-events.js";
1012
import { createDiagnosticTraceContext } from "../../../infra/diagnostic-trace-context.js";
13+
import {
14+
getDiagnosticSessionActivitySnapshot,
15+
resetDiagnosticRunActivityForTest,
16+
} from "../../../logging/diagnostic-run-activity.js";
1117
import {
1218
initializeGlobalHookRunner,
1319
resetGlobalHookRunner,
@@ -100,11 +106,16 @@ function requireMockRecordArg(
100106
describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
101107
beforeEach(() => {
102108
resetDiagnosticEventsForTest();
109+
resetDiagnosticRunActivityForTest();
103110
resetGlobalHookRunner();
104111
});
105112

106113
afterEach(() => {
114+
resetDiagnosticEventsForTest();
107115
resetGlobalHookRunner();
116+
resetDiagnosticRunActivityForTest();
117+
vi.restoreAllMocks();
118+
vi.useRealTimers();
108119
});
109120

110121
it("emits started and completed events for async streams", async () => {
@@ -182,6 +193,117 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
182193
expect(JSON.stringify(events)).not.toContain("sk-test-secret-value");
183194
});
184195

196+
it("updates diagnostic run activity from throttled stream chunks", async () => {
197+
let now = 1_000_000;
198+
vi.spyOn(Date, "now").mockImplementation(() => now);
199+
async function* stream() {
200+
yield { type: "text_delta", delta: "first" };
201+
yield { type: "text_delta", delta: "second" };
202+
yield { type: "text_delta", delta: "third" };
203+
}
204+
const runProgressEvents: DiagnosticEventPayload[] = [];
205+
const stop = onInternalDiagnosticEvent((event) => {
206+
if (event.type === "run.progress") {
207+
runProgressEvents.push(event);
208+
}
209+
});
210+
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
211+
(() => stream()) as unknown as StreamFn,
212+
{
213+
runId: "run-1",
214+
sessionKey: "session-key",
215+
sessionId: "session-id",
216+
provider: "vllm",
217+
model: "qwen/qwen3.5-9b",
218+
trace: createDiagnosticTraceContext(),
219+
nextCallId: () => "call-stream",
220+
},
221+
);
222+
223+
const returned = wrapped({} as never, {} as never, {} as never) as AsyncIterable<unknown>;
224+
const iterator = returned[Symbol.asyncIterator]();
225+
226+
try {
227+
await iterator.next();
228+
await waitForDiagnosticEventsDrained();
229+
let snapshot = getDiagnosticSessionActivitySnapshot({
230+
sessionKey: "session-key",
231+
sessionId: "session-id",
232+
});
233+
expect(snapshot.activeWorkKind).toBe("model_call");
234+
expect(snapshot.lastProgressReason).toBe("model_call:stream_progress");
235+
expect(snapshot.lastProgressAgeMs).toBe(0);
236+
expect(runProgressEvents).toHaveLength(1);
237+
238+
now += 10_000;
239+
await iterator.next();
240+
await waitForDiagnosticEventsDrained();
241+
snapshot = getDiagnosticSessionActivitySnapshot({
242+
sessionKey: "session-key",
243+
sessionId: "session-id",
244+
});
245+
expect(snapshot.lastProgressReason).toBe("model_call:stream_progress");
246+
expect(snapshot.lastProgressAgeMs).toBe(0);
247+
expect(runProgressEvents).toHaveLength(1);
248+
249+
now += 30_000;
250+
await iterator.next();
251+
await waitForDiagnosticEventsDrained();
252+
snapshot = getDiagnosticSessionActivitySnapshot({
253+
sessionKey: "session-key",
254+
sessionId: "session-id",
255+
});
256+
expect(snapshot.lastProgressReason).toBe("model_call:stream_progress");
257+
expect(snapshot.lastProgressAgeMs).toBe(0);
258+
expect(runProgressEvents).toHaveLength(2);
259+
} finally {
260+
await iterator.return?.();
261+
await waitForDiagnosticEventsDrained();
262+
stop();
263+
}
264+
});
265+
266+
it("does not retain stream progress activity when diagnostics are disabled", async () => {
267+
setDiagnosticsEnabledForProcess(false);
268+
const runProgressEvents: DiagnosticEventPayload[] = [];
269+
const stop = onInternalDiagnosticEvent((event) => {
270+
if (event.type === "run.progress") {
271+
runProgressEvents.push(event);
272+
}
273+
});
274+
async function* stream() {
275+
yield { type: "text_delta", delta: "first" };
276+
yield { type: "text_delta", delta: "second" };
277+
}
278+
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
279+
(() => stream()) as unknown as StreamFn,
280+
{
281+
runId: "run-1",
282+
sessionKey: "session-key",
283+
sessionId: "session-id",
284+
provider: "vllm",
285+
model: "qwen/qwen3.5-9b",
286+
trace: createDiagnosticTraceContext(),
287+
nextCallId: () => "call-disabled-diagnostics",
288+
},
289+
);
290+
291+
try {
292+
await drain(wrapped({} as never, {} as never, {} as never) as AsyncIterable<unknown>);
293+
await waitForDiagnosticEventsDrained();
294+
} finally {
295+
stop();
296+
}
297+
298+
expect(
299+
getDiagnosticSessionActivitySnapshot({
300+
sessionKey: "session-key",
301+
sessionId: "session-id",
302+
}),
303+
).toEqual({});
304+
expect(runProgressEvents).toEqual([]);
305+
});
306+
185307
it("counts async onPayload replacements instead of raw payload content", async () => {
186308
async function* stream() {
187309
yield { type: "text_delta", delta: "safe" };

src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import {
66
diagnosticProviderRequestIdHash,
77
} from "../../../infra/diagnostic-error-metadata.js";
88
import {
9+
areDiagnosticsEnabledForProcess,
10+
emitTrustedDiagnosticEvent,
911
type DiagnosticEventInput,
1012
type DiagnosticModelCallContent,
1113
type DiagnosticMemoryUsage,
@@ -18,6 +20,7 @@ import {
1820
formatDiagnosticTraceparent,
1921
type DiagnosticTraceContext,
2022
} from "../../../infra/diagnostic-trace-context.js";
23+
import { markDiagnosticRunProgress } from "../../../logging/diagnostic-run-activity.js";
2124
import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js";
2225
import type {
2326
PluginHookAgentContext,
@@ -75,8 +78,11 @@ type ModelCallObservationState = {
7578
modelContent?: DiagnosticModelCallContent;
7679
outputMessages?: unknown[];
7780
contentCapture?: DiagnosticModelContentCapturePolicy;
81+
lastStreamProgressAt?: number;
7882
};
7983

84+
const MODEL_CALL_STREAM_PROGRESS_INTERVAL_MS = 30_000;
85+
const MODEL_CALL_STREAM_PROGRESS_REASON = "model_call:stream_progress";
8086
const MODEL_CALL_STREAM_RETURN_TIMEOUT_MS = 1000;
8187
const TRACEPARENT_HEADER_NAME = "traceparent";
8288
type ModelCallStreamOptions = Parameters<StreamFn>[2];
@@ -157,6 +163,39 @@ function observeResponseChunk(
157163
}
158164
}
159165

166+
function maybeEmitModelCallStreamProgress(
167+
eventBase: ModelCallEventBase,
168+
state: ModelCallObservationState,
169+
): void {
170+
if (!areDiagnosticsEnabledForProcess()) {
171+
return;
172+
}
173+
const now = Date.now();
174+
const progressFields = {
175+
runId: eventBase.runId,
176+
...(eventBase.sessionKey ? { sessionKey: eventBase.sessionKey } : {}),
177+
...(eventBase.sessionId ? { sessionId: eventBase.sessionId } : {}),
178+
reason: MODEL_CALL_STREAM_PROGRESS_REASON,
179+
};
180+
markDiagnosticRunProgress(progressFields);
181+
if (
182+
state.lastStreamProgressAt !== undefined &&
183+
now - state.lastStreamProgressAt < MODEL_CALL_STREAM_PROGRESS_INTERVAL_MS
184+
) {
185+
return;
186+
}
187+
state.lastStreamProgressAt = now;
188+
// Streaming providers, local or remote, are expected to produce chunks or
189+
// heartbeat-style progress. The in-memory freshness clock is refreshed for
190+
// each chunk, while diagnostic events are throttled so token streams do not
191+
// spam observers; silent/non-streaming calls remain recoverable after the
192+
// configured stuck-session timeout.
193+
emitTrustedDiagnosticEvent({
194+
type: "run.progress",
195+
...progressFields,
196+
});
197+
}
198+
160199
function modelCallSizeTimingFields(state: ModelCallObservationState): ModelCallSizeTimingFields {
161200
return {
162201
...(state.requestPayloadBytes !== undefined
@@ -481,6 +520,7 @@ async function* observeModelCallIterator<T>(
481520
break;
482521
}
483522
observeResponseChunk(state, startedAt, next.value);
523+
maybeEmitModelCallStreamProgress(eventBase, state);
484524
yield next.value;
485525
}
486526
terminalEmitted = true;

src/agents/pi-embedded-runner/run/llm-idle-timeout.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,6 @@ export function resolveLlmIdleTimeoutMs(params?: {
144144
value > 0 &&
145145
value < MAX_SAFE_TIMEOUT_MS,
146146
);
147-
const baseUrl = params?.model?.baseUrl;
148-
const isLocalProvider =
149-
typeof baseUrl === "string" && baseUrl.length > 0 && isLocalProviderBaseUrl(baseUrl);
150147

151148
const modelRequestTimeoutMs = params?.modelRequestTimeoutMs;
152149
if (
@@ -190,6 +187,9 @@ export function resolveLlmIdleTimeoutMs(params?: {
190187
// baseUrl pointing at loopback / private-network / `.local`. Ollama cloud
191188
// models are still hosted remotely even when proxied through local Ollama, so
192189
// keep the cloud watchdog for `*:cloud` model ids.
190+
const baseUrl = params?.model?.baseUrl;
191+
const isLocalProvider =
192+
typeof baseUrl === "string" && baseUrl.length > 0 && isLocalProviderBaseUrl(baseUrl);
193193
if (isLocalProvider && !isOllamaCloudModel(params?.model)) {
194194
return 0;
195195
}

src/logging/diagnostic-run-activity.ts

Lines changed: 54 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ type DiagnosticModelStartedActivityEvent = Pick<
3131
"runId" | "sessionId" | "sessionKey" | "provider" | "model"
3232
>;
3333

34+
type DiagnosticRunProgressActivityEvent = Pick<
35+
Extract<DiagnosticEventPayload, { type: "run.progress" }>,
36+
"runId" | "sessionId" | "sessionKey" | "reason"
37+
>;
38+
3439
export type DiagnosticSessionActivitySnapshot = {
3540
activeWorkKind?: DiagnosticSessionActiveWorkKind;
3641
hasActiveEmbeddedRun?: boolean;
@@ -93,8 +98,8 @@ function mergeSessionActivity(target: SessionActivity, source: SessionActivity):
9398
for (const [key, tool] of source.activeTools) {
9499
target.activeTools.set(key, tool);
95100
}
96-
for (const call of source.activeModelCalls) {
97-
target.activeModelCalls.add(call);
101+
for (const key of source.activeModelCalls) {
102+
target.activeModelCalls.add(key);
98103
}
99104
if (source.lastProgressAt > target.lastProgressAt) {
100105
target.lastProgressAt = source.lastProgressAt;
@@ -220,12 +225,16 @@ function recordModelEnded(
220225
touchSessionActivity(activity, "model_call:ended");
221226
}
222227

223-
function recordRunProgress(event: Extract<DiagnosticEventPayload, { type: "run.progress" }>): void {
224-
const activity = resolveSessionActivity({ ...event, create: true });
228+
function recordRunProgress(event: DiagnosticRunProgressActivityEvent): void {
229+
markDiagnosticRunProgress(event);
230+
}
231+
232+
export function markDiagnosticRunProgress(params: DiagnosticRunProgressActivityEvent): void {
233+
const activity = resolveSessionActivity({ ...params, create: true });
225234
if (!activity) {
226235
return;
227236
}
228-
touchSessionActivity(activity, event.reason);
237+
touchSessionActivity(activity, params.reason);
229238
}
230239

231240
function recordRunCompleted(
@@ -301,7 +310,6 @@ export function getDiagnosticSessionActivitySnapshot(
301310
activeTool = tool;
302311
}
303312
}
304-
305313
return {
306314
activeWorkKind,
307315
...(activity.activeEmbeddedRuns.size > 0 ? { hasActiveEmbeddedRun: true } : {}),
@@ -313,17 +321,8 @@ export function getDiagnosticSessionActivitySnapshot(
313321
};
314322
}
315323

316-
export function markDiagnosticRunProgressForTest(params: {
317-
sessionId?: string;
318-
sessionKey?: string;
319-
runId?: string;
320-
reason: string;
321-
}): void {
322-
const activity = resolveSessionActivity({ ...params, create: true });
323-
if (!activity) {
324-
return;
325-
}
326-
touchSessionActivity(activity, params.reason);
324+
export function markDiagnosticRunProgressForTest(params: DiagnosticRunProgressActivityEvent): void {
325+
markDiagnosticRunProgress(params);
327326
}
328327

329328
export function markDiagnosticToolStartedForTest(params: {
@@ -345,32 +344,44 @@ export function markDiagnosticModelStartedForTest(
345344
export function resetDiagnosticRunActivityForTest(): void {
346345
activityByRef.clear();
347346
activityByRunId.clear();
347+
unregisterDiagnosticRunActivityListener?.();
348+
unregisterDiagnosticRunActivityListener = undefined;
349+
registerDiagnosticRunActivityListener();
348350
}
349351

350-
onInternalDiagnosticEvent((event) => {
351-
switch (event.type) {
352-
case "tool.execution.started":
353-
recordToolStarted(event);
354-
return;
355-
case "tool.execution.completed":
356-
case "tool.execution.error":
357-
case "tool.execution.blocked":
358-
recordToolEnded(event);
359-
return;
360-
case "model.call.started":
361-
recordModelStarted(event);
362-
return;
363-
case "model.call.completed":
364-
case "model.call.error":
365-
recordModelEnded(event);
366-
return;
367-
case "run.progress":
368-
recordRunProgress(event);
369-
return;
370-
case "run.completed":
371-
recordRunCompleted(event);
372-
return;
373-
default:
374-
return;
352+
let unregisterDiagnosticRunActivityListener: (() => void) | undefined;
353+
354+
function registerDiagnosticRunActivityListener(): void {
355+
if (unregisterDiagnosticRunActivityListener) {
356+
return;
375357
}
376-
});
358+
unregisterDiagnosticRunActivityListener = onInternalDiagnosticEvent((event) => {
359+
switch (event.type) {
360+
case "tool.execution.started":
361+
recordToolStarted(event);
362+
return;
363+
case "tool.execution.completed":
364+
case "tool.execution.error":
365+
case "tool.execution.blocked":
366+
recordToolEnded(event);
367+
return;
368+
case "model.call.started":
369+
recordModelStarted(event);
370+
return;
371+
case "model.call.completed":
372+
case "model.call.error":
373+
recordModelEnded(event);
374+
return;
375+
case "run.progress":
376+
recordRunProgress(event);
377+
return;
378+
case "run.completed":
379+
recordRunCompleted(event);
380+
return;
381+
default:
382+
return;
383+
}
384+
});
385+
}
386+
387+
registerDiagnosticRunActivityListener();

0 commit comments

Comments
 (0)