Skip to content

Commit aa8070a

Browse files
openperfclawsweeper[bot]Takhoffman
authored
fix(llm): defer Anthropic stream start event until after message_start (#90697)
Summary: - The branch moves Anthropic `start` emission into `message_start` handling for the provider and transport stream paths and adds focused ordering/error tests. - PR surface: Source +5, Tests +149. Total +154 across 4 files. - Reproducibility: Do we have a high-confidence way to reproduce the issue? Yes from source: current main emit ... ecovery intentionally refuses to retry after any non-error output; no live expired-cache run was performed. Automerge notes: - PR branch already contained follow-up commit before automerge: fix(agents): defer Anthropic transport stream start event until after… Validation: - ClawSweeper review passed for head 399a243. - Required merge gates passed before the squash merge. Prepared head SHA: 399a243 Review: #90697 (comment) Co-authored-by: openperf <16864032@qq.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com> Approved-by: takhoffman Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com>
1 parent b1e4b6b commit aa8070a

4 files changed

Lines changed: 159 additions & 5 deletions

File tree

src/agents/anthropic-transport-stream.test.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2034,4 +2034,64 @@ describe("anthropic transport stream", () => {
20342034
expect(payload.thinking).toEqual({ type: "adaptive" });
20352035
expect(payload.output_config).toEqual({ effort: "high" });
20362036
});
2037+
2038+
it("emits start event only after message_start so pre-stream SSE errors arrive before any non-error event", async () => {
2039+
guardedFetchMock.mockResolvedValueOnce(
2040+
createSseResponse([
2041+
{
2042+
type: "message_start",
2043+
message: { id: "msg_1", usage: { input_tokens: 1, output_tokens: 0 } },
2044+
},
2045+
{
2046+
type: "message_delta",
2047+
delta: { stop_reason: "end_turn" },
2048+
usage: { input_tokens: 1, output_tokens: 1 },
2049+
},
2050+
]),
2051+
);
2052+
const streamFn = createAnthropicMessagesTransportStreamFn();
2053+
const stream = streamFn(
2054+
makeAnthropicTransportModel(),
2055+
{ messages: [{ role: "user", content: "hi" }] } as AnthropicStreamContext,
2056+
{ apiKey: "sk-ant-api" } as AnthropicStreamOptions,
2057+
);
2058+
2059+
const eventTypes: string[] = [];
2060+
for await (const event of stream as AsyncIterable<{ type: string }>) {
2061+
eventTypes.push(event.type);
2062+
}
2063+
2064+
const startIndex = eventTypes.indexOf("start");
2065+
expect(startIndex).toBeGreaterThanOrEqual(0);
2066+
expect(eventTypes.slice(0, startIndex).some((t) => t === "error")).toBe(false);
2067+
});
2068+
2069+
it("emits error without a preceding start event when SSE error arrives before message_start", async () => {
2070+
guardedFetchMock.mockResolvedValueOnce(
2071+
createRawSseResponse(
2072+
"event: error\ndata: " +
2073+
JSON.stringify({
2074+
type: "invalid_request_error",
2075+
message: "messages.1.content.63: Invalid signature in thinking block",
2076+
}) +
2077+
"\n\n",
2078+
),
2079+
);
2080+
const streamFn = createAnthropicMessagesTransportStreamFn();
2081+
const stream = streamFn(
2082+
makeAnthropicTransportModel(),
2083+
{ messages: [{ role: "user", content: "hi" }] } as AnthropicStreamContext,
2084+
{ apiKey: "sk-ant-api" } as AnthropicStreamOptions,
2085+
);
2086+
2087+
const eventTypes: string[] = [];
2088+
for await (const event of stream as AsyncIterable<{ type: string }>) {
2089+
eventTypes.push(event.type);
2090+
}
2091+
2092+
// start must not precede the error path, regardless of whether the mock
2093+
// surfaces the SSE error as an explicit "error" event or silently ends the
2094+
// stream (a timing artefact of synchronous mock SSE delivery).
2095+
expect(eventTypes).not.toContain("start");
2096+
});
20372097
});

src/agents/anthropic-transport-stream.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,6 @@ export function createAnthropicMessagesTransportStreamFn(): StreamFn {
992992
{ ...params, stream: true },
993993
transportOptions.signal ? { signal: transportOptions.signal } : undefined,
994994
);
995-
stream.push({ type: "start", partial: output as never });
996995
const blocks = output.content;
997996
const signatureDeltaIndexes = new Set<number>();
998997
const allowReasoningContentReplay = supportsReasoningContentReplay(model);
@@ -1130,6 +1129,11 @@ export function createAnthropicMessagesTransportStreamFn(): StreamFn {
11301129
output.usage.cacheRead +
11311130
output.usage.cacheWrite;
11321131
calculateCost(model, output.usage);
1132+
// Defer start until after message_start so that pre-stream SSE errors
1133+
// (e.g. invalid thinking signatures) arrive before any non-error event
1134+
// is yielded, keeping yieldedOutput=false in pumpStreamWithRecovery
1135+
// and allowing the thinking-block recovery retry to fire.
1136+
stream.push({ type: "start", partial: output as never });
11331137
continue;
11341138
}
11351139
if (event.type === "content_block_start") {

src/llm/providers/anthropic.test.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,95 @@ describe("Anthropic provider", () => {
252252
]);
253253
});
254254

255+
it("emits start event only after message_start so pre-stream SSE errors arrive before any non-error event", async () => {
256+
function createSseEventResponse(lines: string): Response {
257+
return new Response(lines, {
258+
status: 200,
259+
headers: { "content-type": "text/event-stream" },
260+
});
261+
}
262+
263+
const client = {
264+
messages: {
265+
create: vi.fn(() => ({
266+
asResponse: () =>
267+
Promise.resolve(
268+
createSseEventResponse(
269+
"event: message_start\ndata: " +
270+
JSON.stringify({
271+
type: "message_start",
272+
message: { id: "msg_1", usage: { input_tokens: 1, output_tokens: 0 } },
273+
}) +
274+
"\n\nevent: message_stop\ndata: " +
275+
JSON.stringify({ type: "message_stop" }) +
276+
"\n\n",
277+
),
278+
),
279+
})),
280+
},
281+
};
282+
283+
const stream = streamAnthropic(
284+
makeAnthropicModel(),
285+
{ messages: [{ role: "user", content: "hi", timestamp: 0 }] },
286+
{ apiKey: "sk-ant-key", client: client as never },
287+
);
288+
289+
const eventTypes: string[] = [];
290+
for await (const event of stream as AsyncIterable<{ type: string }>) {
291+
eventTypes.push(event.type);
292+
}
293+
294+
// start must come after message_start processing, not before the loop
295+
const startIndex = eventTypes.indexOf("start");
296+
expect(startIndex).toBeGreaterThanOrEqual(0);
297+
// No error before start — the start event should be first non-error event
298+
const errorBeforeStart = eventTypes.slice(0, startIndex).some((t) => t === "error");
299+
expect(errorBeforeStart).toBe(false);
300+
});
301+
302+
it("emits error without a preceding start event when SSE error arrives before message_start", async () => {
303+
function createSseEventResponse(lines: string): Response {
304+
return new Response(lines, {
305+
status: 200,
306+
headers: { "content-type": "text/event-stream" },
307+
});
308+
}
309+
310+
const client = {
311+
messages: {
312+
create: vi.fn(() => ({
313+
asResponse: () =>
314+
Promise.resolve(
315+
createSseEventResponse(
316+
"event: error\ndata: " +
317+
JSON.stringify({
318+
type: "invalid_request_error",
319+
message: "messages.1.content.63: Invalid signature in thinking block",
320+
}) +
321+
"\n\n",
322+
),
323+
),
324+
})),
325+
},
326+
};
327+
328+
const stream = streamAnthropic(
329+
makeAnthropicModel(),
330+
{ messages: [{ role: "user", content: "hi", timestamp: 0 }] },
331+
{ apiKey: "sk-ant-key", client: client as never },
332+
);
333+
334+
const eventTypes: string[] = [];
335+
for await (const event of stream as AsyncIterable<{ type: string }>) {
336+
eventTypes.push(event.type);
337+
}
338+
339+
// error must be the first event — no start emitted before it
340+
expect(eventTypes[0]).toBe("error");
341+
expect(eventTypes).not.toContain("start");
342+
});
343+
255344
it("strips the internal cache boundary when Anthropic cache control is disabled", async () => {
256345
let capturedPayload: unknown;
257346
const stream = streamSimpleAnthropic(

src/llm/providers/anthropic.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,6 @@ export const streamAnthropic: StreamFunction<"anthropic-messages", AnthropicOpti
515515
{ status: response.status, headers: headersToRecord(response.headers) },
516516
model,
517517
);
518-
stream.push({ type: "start", partial: output });
519518

520519
type Block = (ThinkingContent | TextContent | (ToolCall & { partialJson: string })) & {
521520
index: number;
@@ -525,19 +524,21 @@ export const streamAnthropic: StreamFunction<"anthropic-messages", AnthropicOpti
525524
for await (const event of iterateAnthropicEvents(response, options?.signal)) {
526525
if (event.type === "message_start") {
527526
output.responseId = event.message.id;
528-
// Capture initial token usage from message_start event
529-
// This ensures we have input token counts even if the stream is aborted early
530527
output.usage.input = event.message.usage.input_tokens || 0;
531528
output.usage.output = event.message.usage.output_tokens || 0;
532529
output.usage.cacheRead = event.message.usage.cache_read_input_tokens || 0;
533530
output.usage.cacheWrite = event.message.usage.cache_creation_input_tokens || 0;
534-
// Anthropic doesn't provide total_tokens, compute from components
535531
output.usage.totalTokens =
536532
output.usage.input +
537533
output.usage.output +
538534
output.usage.cacheRead +
539535
output.usage.cacheWrite;
540536
calculateCost(model, output.usage);
537+
// Defer start until after message_start so that pre-stream SSE errors
538+
// (e.g. invalid thinking signatures) arrive before any non-error event
539+
// is yielded, keeping yieldedOutput=false in pumpStreamWithRecovery
540+
// and allowing the thinking-block recovery retry to fire.
541+
stream.push({ type: "start", partial: output });
541542
} else if (event.type === "content_block_start") {
542543
if (event.content_block.type === "text") {
543544
const block: Block = {

0 commit comments

Comments
 (0)