Skip to content

Commit 399a243

Browse files
committed
fix(agents): defer Anthropic transport stream start event until after message_start
Applies the same start-event deferral fix from src/llm/providers/anthropic.ts to the embedded-agent default path. resolveEmbeddedAgentStreamFn routes anthropic-messages through createBoundaryAwareStreamFnForModel → createAnthropicMessagesTransportStreamFn, so the thinking-block recovery bug (pumpStreamWithRecovery yieldedOutput gate) affects the production embedded path via this file, not just the provider stream. Moves stream.push({type:'start'}) from before the SDK event loop into the message_start handler, keeping yieldedOutput=false in pumpStreamWithRecovery when an SSE event: error arrives before message_start (as Anthropic sends for invalid thinking signatures).
1 parent 17203dc commit 399a243

2 files changed

Lines changed: 65 additions & 1 deletion

File tree

src/agents/anthropic-transport-stream.test.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2034,4 +2034,64 @@ describe("anthropic transport stream", () => {
20342034
expect(payload.thinking).toEqual({ type: "adaptive" });
20352035
expect(payload.output_config).toEqual({ effort: "high" });
20362036
});
2037+
2038+
it("emits start event only after message_start so pre-stream SSE errors arrive before any non-error event", async () => {
2039+
guardedFetchMock.mockResolvedValueOnce(
2040+
createSseResponse([
2041+
{
2042+
type: "message_start",
2043+
message: { id: "msg_1", usage: { input_tokens: 1, output_tokens: 0 } },
2044+
},
2045+
{
2046+
type: "message_delta",
2047+
delta: { stop_reason: "end_turn" },
2048+
usage: { input_tokens: 1, output_tokens: 1 },
2049+
},
2050+
]),
2051+
);
2052+
const streamFn = createAnthropicMessagesTransportStreamFn();
2053+
const stream = streamFn(
2054+
makeAnthropicTransportModel(),
2055+
{ messages: [{ role: "user", content: "hi" }] } as AnthropicStreamContext,
2056+
{ apiKey: "sk-ant-api" } as AnthropicStreamOptions,
2057+
);
2058+
2059+
const eventTypes: string[] = [];
2060+
for await (const event of stream as AsyncIterable<{ type: string }>) {
2061+
eventTypes.push(event.type);
2062+
}
2063+
2064+
const startIndex = eventTypes.indexOf("start");
2065+
expect(startIndex).toBeGreaterThanOrEqual(0);
2066+
expect(eventTypes.slice(0, startIndex).some((t) => t === "error")).toBe(false);
2067+
});
2068+
2069+
it("emits error without a preceding start event when SSE error arrives before message_start", async () => {
2070+
guardedFetchMock.mockResolvedValueOnce(
2071+
createRawSseResponse(
2072+
"event: error\ndata: " +
2073+
JSON.stringify({
2074+
type: "invalid_request_error",
2075+
message: "messages.1.content.63: Invalid signature in thinking block",
2076+
}) +
2077+
"\n\n",
2078+
),
2079+
);
2080+
const streamFn = createAnthropicMessagesTransportStreamFn();
2081+
const stream = streamFn(
2082+
makeAnthropicTransportModel(),
2083+
{ messages: [{ role: "user", content: "hi" }] } as AnthropicStreamContext,
2084+
{ apiKey: "sk-ant-api" } as AnthropicStreamOptions,
2085+
);
2086+
2087+
const eventTypes: string[] = [];
2088+
for await (const event of stream as AsyncIterable<{ type: string }>) {
2089+
eventTypes.push(event.type);
2090+
}
2091+
2092+
// start must not precede the error path, regardless of whether the mock
2093+
// surfaces the SSE error as an explicit "error" event or silently ends the
2094+
// stream (a timing artefact of synchronous mock SSE delivery).
2095+
expect(eventTypes).not.toContain("start");
2096+
});
20372097
});

src/agents/anthropic-transport-stream.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -992,7 +992,6 @@ export function createAnthropicMessagesTransportStreamFn(): StreamFn {
992992
{ ...params, stream: true },
993993
transportOptions.signal ? { signal: transportOptions.signal } : undefined,
994994
);
995-
stream.push({ type: "start", partial: output as never });
996995
const blocks = output.content;
997996
const signatureDeltaIndexes = new Set<number>();
998997
const allowReasoningContentReplay = supportsReasoningContentReplay(model);
@@ -1130,6 +1129,11 @@ export function createAnthropicMessagesTransportStreamFn(): StreamFn {
11301129
output.usage.cacheRead +
11311130
output.usage.cacheWrite;
11321131
calculateCost(model, output.usage);
1132+
// Defer start until after message_start so that pre-stream SSE errors
1133+
// (e.g. invalid thinking signatures) arrive before any non-error event
1134+
// is yielded, keeping yieldedOutput=false in pumpStreamWithRecovery
1135+
// and allowing the thinking-block recovery retry to fire.
1136+
stream.push({ type: "start", partial: output as never });
11331137
continue;
11341138
}
11351139
if (event.type === "content_block_start") {

0 commit comments

Comments
 (0)