Skip to content

Commit da2e177

Browse files
committed
fix(agents): stream phased text deltas incrementally
1 parent e8f3bce commit da2e177

3 files changed

Lines changed: 180 additions & 2 deletions

File tree

src/agents/embedded-agent-subscribe.handlers.messages.test.ts

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,141 @@ describe("handleMessageUpdate text signatures", () => {
451451
]);
452452
});
453453

454+
it("uses incremental deltas for same-item phased streams", () => {
455+
const onAgentEvent = vi.fn();
456+
const context = createMessageUpdateContext({ onAgentEvent });
457+
const signature = JSON.stringify({ v: 1, id: "item-final", phase: "final_answer" });
458+
const partial = {
459+
role: "assistant",
460+
phase: "final_answer",
461+
content: [
462+
{
463+
type: "text",
464+
textSignature: signature,
465+
get text() {
466+
throw new Error("full partial text should not be read");
467+
},
468+
},
469+
],
470+
};
471+
472+
const createPhasedDelta = (delta: string) =>
473+
({
474+
type: "message_update",
475+
message: { role: "assistant", content: [] },
476+
assistantMessageEvent: {
477+
type: "text_delta",
478+
delta,
479+
partial,
480+
},
481+
}) as never;
482+
483+
handleMessageUpdate(context, createPhasedDelta("Hello"));
484+
handleMessageUpdate(context, createPhasedDelta(" world"));
485+
486+
expect(onAgentEvent.mock.calls.map(([event]) => event)).toMatchObject([
487+
{
488+
stream: "assistant",
489+
data: { text: "Hello", delta: "Hello", phase: "final_answer" },
490+
},
491+
{
492+
stream: "assistant",
493+
data: { text: "Hello world", delta: " world", phase: "final_answer" },
494+
},
495+
]);
496+
});
497+
498+
it("keeps same-item phased stream deltas on the user-visible sanitizer path", () => {
499+
const onAgentEvent = vi.fn();
500+
const context = createMessageUpdateContext({ onAgentEvent });
501+
const signature = JSON.stringify({ v: 1, id: "item-final", phase: "final_answer" });
502+
const partial = {
503+
role: "assistant",
504+
phase: "final_answer",
505+
content: [
506+
{
507+
type: "text",
508+
textSignature: signature,
509+
get text() {
510+
throw new Error("full partial text should not be read");
511+
},
512+
},
513+
],
514+
};
515+
516+
const createPhasedDelta = (delta: string) =>
517+
({
518+
type: "message_update",
519+
message: { role: "assistant", content: [] },
520+
assistantMessageEvent: {
521+
type: "text_delta",
522+
delta,
523+
partial,
524+
},
525+
}) as never;
526+
527+
handleMessageUpdate(context, createPhasedDelta("Visible\n<tool_call>{"));
528+
handleMessageUpdate(
529+
context,
530+
createPhasedDelta('"name":"read","arguments":{"file_path":"secret.md"}}</tool_call>'),
531+
);
532+
handleMessageUpdate(context, createPhasedDelta("\nDone."));
533+
534+
expect(onAgentEvent.mock.calls.map(([event]) => event)).toMatchObject([
535+
{
536+
stream: "assistant",
537+
data: { text: "Visible", delta: "Visible", phase: "final_answer" },
538+
},
539+
{
540+
stream: "assistant",
541+
data: { text: "Visible\n\nDone.", delta: "\n\nDone.", phase: "final_answer" },
542+
},
543+
]);
544+
});
545+
546+
it("keeps sanitizer context when a same-item phased stream starts hidden", () => {
547+
const onAgentEvent = vi.fn();
548+
const context = createMessageUpdateContext({ onAgentEvent });
549+
const signature = JSON.stringify({ v: 1, id: "item-final", phase: "final_answer" });
550+
const partial = {
551+
role: "assistant",
552+
phase: "final_answer",
553+
content: [
554+
{
555+
type: "text",
556+
textSignature: signature,
557+
get text() {
558+
throw new Error("full partial text should not be read");
559+
},
560+
},
561+
],
562+
};
563+
564+
const createPhasedDelta = (delta: string) =>
565+
({
566+
type: "message_update",
567+
message: { role: "assistant", content: [] },
568+
assistantMessageEvent: {
569+
type: "text_delta",
570+
delta,
571+
partial,
572+
},
573+
}) as never;
574+
575+
handleMessageUpdate(context, createPhasedDelta("<tool_call>{"));
576+
handleMessageUpdate(
577+
context,
578+
createPhasedDelta('"name":"read","arguments":{"file_path":"secret.md"}}</tool_call>\nDone.'),
579+
);
580+
581+
expect(onAgentEvent.mock.calls.map(([event]) => event)).toMatchObject([
582+
{
583+
stream: "assistant",
584+
data: { text: "Done.", delta: "Done.", phase: "final_answer" },
585+
},
586+
]);
587+
});
588+
454589
it("treats phased textSignature item changes as assistant-message boundaries", () => {
455590
const flushBlockReplyBuffer = vi.fn();
456591
const resetAssistantMessageState = vi.fn();

src/agents/embedded-agent-subscribe.handlers.messages.ts

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import {
3535
extractThinkingFromTaggedStream,
3636
extractThinkingFromTaggedText,
3737
promoteThinkingTagsToBlocks,
38+
sanitizeAssistantVisibleStreamText,
3839
} from "./embedded-agent-utils.js";
3940
import type { AgentEvent, AgentMessage } from "./runtime/index.js";
4041

@@ -215,6 +216,22 @@ function resolveStreamVisibleText(params: {
215216
return { rawText, visibleText: rawText.trim() };
216217
}
217218

219+
function resolveTextAppendDelta(previousText: string, nextText: string): string {
220+
if (!nextText) {
221+
return "";
222+
}
223+
if (!previousText) {
224+
return nextText;
225+
}
226+
if (nextText.startsWith(previousText)) {
227+
return nextText.slice(previousText.length);
228+
}
229+
if (previousText.startsWith(nextText)) {
230+
return "";
231+
}
232+
return nextText;
233+
}
234+
218235
function copyPartialBlockState(
219236
target: EmbeddedAgentSubscribeState["partialBlockState"],
220237
source: EmbeddedAgentSubscribeState["partialBlockState"],
@@ -634,9 +651,11 @@ export function handleMessageUpdate(
634651
!deliveryPhase &&
635652
Boolean(streamItemId) &&
636653
isOpenAiResponsesAssistantMessage(partialAssistant);
654+
let streamItemChanged = false;
637655
if ((deliveryPhase || isPhasePendingOpenAiResponsesTextItem) && streamItemId) {
638656
const previousStreamItemId = ctx.state.lastAssistantStreamItemId;
639657
if (previousStreamItemId && previousStreamItemId !== streamItemId) {
658+
streamItemChanged = true;
640659
void ctx.flushBlockReplyBuffer({ assistantMessageIndex: ctx.state.assistantMessageIndex });
641660
ctx.resetAssistantMessageState(ctx.state.assistantTexts.length);
642661
void ctx.params.onAssistantMessageStart?.();
@@ -664,11 +683,29 @@ export function handleMessageUpdate(
664683
}
665684
const wasThinking = ctx.state.partialBlockState.thinking;
666685
let visibleDelta = "";
667-
let next = shouldUsePhaseAwareBlockReply
686+
const shouldReadPhaseAwarePartialText =
687+
shouldUsePhaseAwareBlockReply && (streamItemChanged || evtType === "text_end" || !chunk);
688+
let next = shouldReadPhaseAwarePartialText
668689
? coerceChatContentText(extractAssistantVisibleText(partialAssistant)).trim()
669690
: "";
670691
let nextRawStreamText = next;
671-
if (!next && deliveryPhase !== "final_answer") {
692+
let shouldPersistRawStreamText = false;
693+
if (shouldUsePhaseAwareBlockReply && !next && deliveryPhase === "final_answer" && chunk) {
694+
visibleDelta = ctx.stripBlockTags(chunk, ctx.state.partialBlockState, {
695+
final: evtType === "text_end",
696+
});
697+
const streamVisibleText = resolveStreamVisibleText({
698+
previousRawText: ctx.state.lastStreamedAssistant ?? "",
699+
visibleDelta,
700+
});
701+
const previousVisibleText = sanitizeAssistantVisibleStreamText(
702+
ctx.state.lastStreamedAssistant ?? "",
703+
).trim();
704+
next = sanitizeAssistantVisibleStreamText(streamVisibleText.rawText).trim();
705+
visibleDelta = resolveTextAppendDelta(previousVisibleText, next);
706+
nextRawStreamText = streamVisibleText.rawText;
707+
shouldPersistRawStreamText = true;
708+
} else if (!next && deliveryPhase !== "final_answer") {
672709
const pendingTagFragment = ctx.state.partialBlockState.pendingTagFragment;
673710
const shouldRecomputeFullStream = Boolean(pendingTagFragment) || REASONING_TAG_RE.test(chunk);
674711
if (shouldRecomputeFullStream) {
@@ -798,6 +835,8 @@ export function handleMessageUpdate(
798835
void ctx.params.onPartialReply(data);
799836
}
800837
}
838+
} else if (shouldPersistRawStreamText) {
839+
ctx.state.lastStreamedAssistant = nextRawStreamText;
801840
}
802841

803842
if (

src/agents/embedded-agent-utils.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ function sanitizeAssistantText(text: string): string {
3434
return sanitizeAssistantVisibleText(text);
3535
}
3636

37+
export function sanitizeAssistantVisibleStreamText(text: string): string {
38+
return sanitizeUserFacingText(sanitizeAssistantText(text), { errorContext: false });
39+
}
40+
3741
function finalizeAssistantExtraction(msg: AssistantMessage, extracted: string): string {
3842
const errorContext = msg.stopReason === "error";
3943
return sanitizeUserFacingText(extracted, { errorContext });

0 commit comments

Comments
 (0)