Skip to content

Commit a34d208

Browse files
committed
fix(agents): stream assistant deltas incrementally
1 parent ccf3476 commit a34d208

10 files changed

Lines changed: 664 additions & 46 deletions

src/agents/embedded-agent-subscribe.handlers.lifecycle.test.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,22 @@ describe("handleAgentEnd", () => {
687687
});
688688
});
689689

690+
it("final-flushes block replies before clearing pending fence fragments", async () => {
691+
const ctx = createContext(undefined);
692+
ctx.state.blockState.pendingFenceFragment = "```";
693+
ctx.flushBlockReplyBuffer = vi.fn((options?: { final?: boolean }) => {
694+
if (vi.mocked(ctx.flushBlockReplyBuffer).mock.calls.length === 1) {
695+
expect(options).toEqual({ final: true });
696+
expect(ctx.state.blockState.pendingFenceFragment).toBe("```");
697+
}
698+
});
699+
700+
await handleAgentEnd(ctx);
701+
702+
expect(ctx.flushBlockReplyBuffer).toHaveBeenNthCalledWith(1, { final: true });
703+
expect(ctx.state.blockState.pendingFenceFragment).toBeUndefined();
704+
});
705+
690706
it("emits lifecycle end when block reply flush throws", () => {
691707
const onAgentEvent = vi.fn();
692708
const ctx = createContext(undefined, { onAgentEvent });

src/agents/embedded-agent-subscribe.handlers.lifecycle.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,9 @@ export function handleAgentEnd(ctx: EmbeddedAgentSubscribeContext): void | Promi
177177
ctx.state.blockState.thinking = false;
178178
ctx.state.blockState.final = false;
179179
ctx.state.blockState.inlineCode = createInlineCodeState();
180+
ctx.state.blockState.fence = undefined;
181+
ctx.state.blockState.reasoningPendingFenceFragment = undefined;
182+
ctx.state.blockState.pendingFenceFragment = undefined;
180183

181184
if (ctx.state.pendingCompactionRetry > 0) {
182185
ctx.resolveCompactionRetry();
@@ -236,7 +239,7 @@ export function handleAgentEnd(ctx: EmbeddedAgentSubscribeContext): void | Promi
236239
};
237240

238241
try {
239-
const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer();
242+
const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer({ final: true });
240243
finalizeAgentEnd();
241244
const flushPendingMediaAndChannelResult = isPromiseLike<void>(flushBlockReplyBufferResult)
242245
? Promise.resolve(flushBlockReplyBufferResult).then(() => flushPendingMediaAndChannel())

src/agents/embedded-agent-subscribe.handlers.messages.test.ts

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ function createMessageUpdateContext(
2929
debug?: ReturnType<typeof vi.fn>;
3030
shouldEmitPartialReplies?: boolean;
3131
consumePartialReplyDirectives?: ReturnType<typeof vi.fn>;
32+
stripBlockTags?: ReturnType<typeof vi.fn>;
3233
state?: Record<string, unknown>;
3334
} = {},
3435
) {
@@ -64,7 +65,7 @@ function createMessageUpdateContext(
6465
},
6566
log: { debug: params.debug ?? vi.fn() },
6667
noteLastAssistant: vi.fn(),
67-
stripBlockTags: (text: string) => text,
68+
stripBlockTags: params.stripBlockTags ?? vi.fn((text: string) => text),
6869
consumePartialReplyDirectives: params.consumePartialReplyDirectives ?? vi.fn(() => null),
6970
emitReasoningStream: vi.fn(),
7071
flushBlockReplyBuffer: params.flushBlockReplyBuffer ?? vi.fn(),
@@ -289,6 +290,81 @@ describe("pending assistant reply directives", () => {
289290
});
290291

291292
describe("handleMessageUpdate text signatures", () => {
293+
it("uses incremental text deltas for non-phase streams", () => {
294+
const onAgentEvent = vi.fn();
295+
const stripBlockTags = vi.fn((text: string) => text);
296+
const context = createMessageUpdateContext({ onAgentEvent, stripBlockTags });
297+
298+
const createNonPhaseEvent = (text: string, delta: string) =>
299+
({
300+
type: "message_update",
301+
message: { role: "assistant", content: [] },
302+
assistantMessageEvent: {
303+
type: "text_delta",
304+
delta,
305+
partial: {
306+
role: "assistant",
307+
content: [{ type: "text", text }],
308+
stopReason: "stop",
309+
provider: "test",
310+
model: "local",
311+
usage: {},
312+
timestamp: 0,
313+
},
314+
},
315+
}) as never;
316+
317+
handleMessageUpdate(context, createNonPhaseEvent("Hello ", "Hello "));
318+
handleMessageUpdate(context, createNonPhaseEvent("Hello world", "world"));
319+
320+
expect(stripBlockTags.mock.calls.map(([text]) => text)).toEqual(["Hello ", "world"]);
321+
expect(onAgentEvent.mock.calls.map(([event]) => event)).toMatchObject([
322+
{
323+
stream: "assistant",
324+
data: { text: "Hello", delta: "Hello" },
325+
},
326+
{
327+
stream: "assistant",
328+
data: { text: "Hello world", delta: " world" },
329+
},
330+
]);
331+
});
332+
333+
it("uses full partial text for suffix deltas after a suppressed commentary item", () => {
334+
const onAgentEvent = vi.fn();
335+
const context = createMessageUpdateContext({ onAgentEvent });
336+
337+
handleMessageUpdate(
338+
context,
339+
createTextUpdateEvent({
340+
type: "text_delta",
341+
text: "Hello",
342+
delta: "Hello",
343+
id: "item-commentary",
344+
signaturePhase: "commentary",
345+
partialPhase: "commentary",
346+
}),
347+
);
348+
handleMessageUpdate(
349+
context,
350+
createTextUpdateEvent({
351+
type: "text_delta",
352+
text: "Hello world",
353+
delta: " world",
354+
id: "item-final",
355+
signaturePhase: "final_answer",
356+
partialPhase: "final_answer",
357+
}),
358+
);
359+
360+
expect(onAgentEvent.mock.calls.map(([event]) => event)).toMatchObject([
361+
{
362+
stream: "assistant",
363+
data: { text: "Hello world", delta: "Hello world", phase: "final_answer" },
364+
},
365+
]);
366+
});
367+
292368
it("treats phased textSignature item changes as assistant-message boundaries", () => {
293369
const flushBlockReplyBuffer = vi.fn();
294370
const resetAssistantMessageState = vi.fn();

src/agents/embedded-agent-subscribe.handlers.messages.ts

Lines changed: 107 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,47 @@ function resolveAssistantTextChunk(params: {
200200
return "";
201201
}
202202

203+
const REASONING_TAG_RE = /<\s*\/?\s*(?:(?:antml:)?(?:think(?:ing)?|thought)|antthinking)\b/i;
204+
205+
function resolveStreamVisibleText(params: {
206+
previousRawText: string;
207+
visibleDelta: string;
208+
finalText?: string;
209+
}): { rawText: string; visibleText: string } {
210+
if (params.finalText !== undefined) {
211+
const rawText = params.finalText;
212+
return { rawText, visibleText: rawText.trim() };
213+
}
214+
const rawText = `${params.previousRawText}${params.visibleDelta}`;
215+
return { rawText, visibleText: rawText.trim() };
216+
}
217+
218+
function copyPartialBlockState(
219+
target: EmbeddedAgentSubscribeState["partialBlockState"],
220+
source: EmbeddedAgentSubscribeState["partialBlockState"],
221+
) {
222+
const copyFenceState = (fence?: typeof source.fence) =>
223+
fence
224+
? {
225+
atLineStart: fence.atLineStart,
226+
...(fence.open ? { open: { ...fence.open } } : {}),
227+
}
228+
: undefined;
229+
target.thinking = source.thinking;
230+
target.final = source.final;
231+
target.inlineCode = { ...source.inlineCode };
232+
target.fence = copyFenceState(source.fence);
233+
target.reasoningInlineCode = source.reasoningInlineCode
234+
? { ...source.reasoningInlineCode }
235+
: undefined;
236+
target.reasoningFence = copyFenceState(source.reasoningFence);
237+
target.reasoningPendingFenceFragment = source.reasoningPendingFenceFragment;
238+
target.finalInlineCode = source.finalInlineCode ? { ...source.finalInlineCode } : undefined;
239+
target.finalFence = copyFenceState(source.finalFence);
240+
target.pendingFenceFragment = source.pendingFenceFragment;
241+
target.pendingTagFragment = source.pendingTagFragment;
242+
}
243+
203244
export function resolveSilentReplyFallbackText(params: {
204245
text: unknown;
205246
messagingToolSentTexts: string[];
@@ -551,9 +592,6 @@ export function handleMessageUpdate(
551592
if (isPhasePendingOpenAiResponsesTextItem) {
552593
return;
553594
}
554-
const phaseAwareVisibleText = coerceChatContentText(
555-
extractAssistantVisibleText(partialAssistant),
556-
).trim();
557595
const shouldUsePhaseAwareBlockReply = Boolean(deliveryPhase);
558596

559597
if (chunk) {
@@ -567,27 +605,58 @@ export function handleMessageUpdate(
567605
// Handle partial <think> tags: stream whatever reasoning is visible so far.
568606
ctx.emitReasoningStream(extractThinkingFromTaggedStream(ctx.state.deltaBuffer));
569607
}
570-
const next =
571-
phaseAwareVisibleText ||
572-
(deliveryPhase === "final_answer"
573-
? ""
574-
: ctx
575-
.stripBlockTags(
576-
ctx.state.deltaBuffer,
577-
{
578-
thinking: false,
579-
final: false,
580-
inlineCode: createInlineCodeState(),
581-
},
582-
{ final: evtType === "text_end" },
583-
)
584-
.trim());
608+
const wasThinking = ctx.state.partialBlockState.thinking;
609+
let visibleDelta = "";
610+
let next = shouldUsePhaseAwareBlockReply
611+
? coerceChatContentText(extractAssistantVisibleText(partialAssistant)).trim()
612+
: "";
613+
let nextRawStreamText = next;
614+
if (!next && deliveryPhase !== "final_answer") {
615+
const pendingTagFragment = ctx.state.partialBlockState.pendingTagFragment;
616+
const shouldRecomputeFullStream = Boolean(pendingTagFragment) || REASONING_TAG_RE.test(chunk);
617+
if (shouldRecomputeFullStream) {
618+
const recomputeState: EmbeddedAgentSubscribeState["partialBlockState"] = {
619+
thinking: false,
620+
final: false,
621+
inlineCode: createInlineCodeState(),
622+
};
623+
const recomputedRawText = ctx.stripBlockTags(ctx.state.deltaBuffer, recomputeState, {
624+
final: evtType === "text_end",
625+
});
626+
const previousRawText = ctx.state.lastStreamedAssistant ?? "";
627+
const isFullStreamReplacement = !recomputedRawText.startsWith(previousRawText);
628+
next = recomputedRawText.trim();
629+
visibleDelta = isFullStreamReplacement
630+
? recomputedRawText
631+
: recomputedRawText.slice(previousRawText.length);
632+
nextRawStreamText = recomputedRawText;
633+
copyPartialBlockState(ctx.state.partialBlockState, recomputeState);
634+
} else {
635+
visibleDelta =
636+
chunk || evtType === "text_end"
637+
? ctx.stripBlockTags(chunk, ctx.state.partialBlockState, {
638+
final: evtType === "text_end",
639+
})
640+
: "";
641+
if (ctx.state.partialBlockState.pendingTagFragment) {
642+
visibleDelta = "";
643+
next = ctx.state.lastStreamedAssistantCleaned ?? "";
644+
nextRawStreamText = ctx.state.lastStreamedAssistant ?? "";
645+
} else {
646+
const streamVisibleText = resolveStreamVisibleText({
647+
previousRawText: ctx.state.lastStreamedAssistant ?? "",
648+
visibleDelta,
649+
});
650+
next = streamVisibleText.visibleText;
651+
nextRawStreamText = streamVisibleText.rawText;
652+
}
653+
}
654+
} else if (next && (chunk || evtType === "text_end")) {
655+
visibleDelta = ctx.stripBlockTags(chunk, ctx.state.partialBlockState, {
656+
final: evtType === "text_end",
657+
});
658+
}
585659
if (next) {
586-
const wasThinking = ctx.state.partialBlockState.thinking;
587-
const visibleDelta =
588-
chunk || evtType === "text_end"
589-
? ctx.stripBlockTags(chunk, ctx.state.partialBlockState, { final: evtType === "text_end" })
590-
: "";
591660
if (!wasThinking && ctx.state.partialBlockState.thinking) {
592661
openReasoningStream(ctx);
593662
}
@@ -636,7 +705,7 @@ export function handleMessageUpdate(
636705
}
637706
}
638707

639-
ctx.state.lastStreamedAssistant = next;
708+
ctx.state.lastStreamedAssistant = nextRawStreamText;
640709
ctx.state.lastStreamedAssistantCleaned = cleanedText;
641710

642711
if (ctx.params.silentExpected || suppressDeterministicApprovalOutput) {
@@ -755,7 +824,21 @@ export function handleMessageEnd(
755824
ctx.state.blockState.thinking = false;
756825
ctx.state.blockState.final = false;
757826
ctx.state.blockState.inlineCode = createInlineCodeState();
827+
ctx.state.blockState.fence = undefined;
828+
ctx.state.blockState.reasoningInlineCode = undefined;
829+
ctx.state.blockState.reasoningFence = undefined;
830+
ctx.state.blockState.reasoningPendingFenceFragment = undefined;
831+
ctx.state.blockState.finalInlineCode = undefined;
832+
ctx.state.blockState.finalFence = undefined;
833+
ctx.state.blockState.pendingFenceFragment = undefined;
758834
ctx.state.blockState.pendingTagFragment = undefined;
835+
ctx.state.partialBlockState.fence = undefined;
836+
ctx.state.partialBlockState.reasoningInlineCode = undefined;
837+
ctx.state.partialBlockState.reasoningFence = undefined;
838+
ctx.state.partialBlockState.reasoningPendingFenceFragment = undefined;
839+
ctx.state.partialBlockState.finalInlineCode = undefined;
840+
ctx.state.partialBlockState.finalFence = undefined;
841+
ctx.state.partialBlockState.pendingFenceFragment = undefined;
759842
ctx.state.partialBlockState.pendingTagFragment = undefined;
760843
ctx.state.lastStreamedAssistant = undefined;
761844
ctx.state.lastStreamedAssistantCleaned = undefined;

src/agents/embedded-agent-subscribe.handlers.types.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { HeartbeatToolResponse } from "../auto-reply/heartbeat-tool-respons
22
import type { ReplyDirectiveParseResult } from "../auto-reply/reply/reply-directives.js";
33
import type { ReasoningLevel } from "../auto-reply/thinking.js";
44
import type { InlineCodeState } from "../markdown/code-spans.js";
5+
import type { FenceScanState } from "../markdown/fences.js";
56
import type { HookRunner } from "../plugins/hooks.js";
67
import type { AcceptedSessionSpawn } from "./accepted-session-spawn.js";
78
import type { EmbeddedBlockChunker } from "./embedded-agent-block-chunker.js";
@@ -64,12 +65,26 @@ export type EmbeddedAgentSubscribeState = {
6465
thinking: boolean;
6566
final: boolean;
6667
inlineCode: InlineCodeState;
68+
fence?: FenceScanState;
69+
reasoningInlineCode?: InlineCodeState;
70+
reasoningFence?: FenceScanState;
71+
reasoningPendingFenceFragment?: string;
72+
finalInlineCode?: InlineCodeState;
73+
finalFence?: FenceScanState;
74+
pendingFenceFragment?: string;
6775
pendingTagFragment?: string;
6876
};
6977
partialBlockState: {
7078
thinking: boolean;
7179
final: boolean;
7280
inlineCode: InlineCodeState;
81+
fence?: FenceScanState;
82+
reasoningInlineCode?: InlineCodeState;
83+
reasoningFence?: FenceScanState;
84+
reasoningPendingFenceFragment?: string;
85+
finalInlineCode?: InlineCodeState;
86+
finalFence?: FenceScanState;
87+
pendingFenceFragment?: string;
7388
pendingTagFragment?: string;
7489
};
7590
lastStreamedAssistant?: string;
@@ -150,6 +165,13 @@ export type EmbeddedAgentSubscribeContext = {
150165
thinking: boolean;
151166
final: boolean;
152167
inlineCode?: InlineCodeState;
168+
fence?: FenceScanState;
169+
reasoningInlineCode?: InlineCodeState;
170+
reasoningFence?: FenceScanState;
171+
reasoningPendingFenceFragment?: string;
172+
finalInlineCode?: InlineCodeState;
173+
finalFence?: FenceScanState;
174+
pendingFenceFragment?: string;
153175
pendingTagFragment?: string;
154176
},
155177
options?: { final?: boolean },

0 commit comments

Comments
 (0)