Skip to content

Commit 83741c7

Browse files
feat(cli-output): emit thinking_delta events; handle redacted single-block shape
Addresses anagnorisis2peripeteia review 4345523435. Mirrors the onAssistantDelta surface for thinking events: - Streaming path: accumulate per-index, emit {text,delta} per chunk - Single-block path: emit one full event when content_block_stop fires for a thinking block that received no deltas (redacted thinking on adaptive models returns the block fully-formed) Downstream consumers #82285 (Telegram interleave) and #81851 (cli-interactive backend) consume this surface directly. Without the single-block detection, redacted thinking would silently drop on those rendering paths.
1 parent 1e7a0d8 commit 83741c7

2 files changed

Lines changed: 270 additions & 0 deletions

File tree

src/agents/cli-output.test.ts

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
extractCliErrorMessage,
55
parseCliJson,
66
parseCliJsonl,
7+
type CliThinkingDelta,
78
type CliToolUseStartDelta,
89
} from "./cli-output.js";
910
import { createClaudeApiErrorFixture } from "./test-helpers/claude-api-error-fixture.js";
@@ -892,4 +893,148 @@ describe("createCliJsonlStreamingParser", () => {
892893
},
893894
]);
894895
});
896+
897+
it("emits onThinkingDelta for streaming thinking_delta chunks with accumulating text", () => {
898+
const thinking: Array<CliThinkingDelta> = [];
899+
const parser = createCliJsonlStreamingParser({
900+
backend: {
901+
command: "local-cli",
902+
output: "jsonl",
903+
jsonlDialect: "claude-stream-json",
904+
sessionIdFields: ["session_id"],
905+
},
906+
providerId: "claude-cli",
907+
onAssistantDelta: () => undefined,
908+
onThinkingDelta: (delta) => thinking.push(delta),
909+
});
910+
911+
// Streaming-delta path mirrors onAssistantDelta: each chunk emits
912+
// {text: cumulative, delta: this chunk}.
913+
parser.push(
914+
[
915+
JSON.stringify({
916+
type: "stream_event",
917+
event: {
918+
type: "content_block_start",
919+
index: 0,
920+
content_block: { type: "thinking", thinking: "" },
921+
},
922+
}),
923+
JSON.stringify({
924+
type: "stream_event",
925+
event: {
926+
type: "content_block_delta",
927+
index: 0,
928+
delta: { type: "thinking_delta", thinking: "Let me " },
929+
},
930+
}),
931+
JSON.stringify({
932+
type: "stream_event",
933+
event: {
934+
type: "content_block_delta",
935+
index: 0,
936+
delta: { type: "thinking_delta", thinking: "consider " },
937+
},
938+
}),
939+
JSON.stringify({
940+
type: "stream_event",
941+
event: {
942+
type: "content_block_delta",
943+
index: 0,
944+
delta: { type: "thinking_delta", thinking: "this." },
945+
},
946+
}),
947+
JSON.stringify({
948+
type: "stream_event",
949+
event: { type: "content_block_stop", index: 0 },
950+
}),
951+
].join("\n") + "\n",
952+
);
953+
parser.finish();
954+
955+
expect(thinking).toEqual([
956+
{ text: "Let me ", delta: "Let me " },
957+
{ text: "Let me consider ", delta: "consider " },
958+
{ text: "Let me consider this.", delta: "this." },
959+
]);
960+
});
961+
962+
it("emits a single onThinkingDelta when a thinking block arrives fully-formed at content_block_start with no subsequent deltas", () => {
963+
const thinking: Array<CliThinkingDelta> = [];
964+
const parser = createCliJsonlStreamingParser({
965+
backend: {
966+
command: "local-cli",
967+
output: "jsonl",
968+
jsonlDialect: "claude-stream-json",
969+
sessionIdFields: ["session_id"],
970+
},
971+
providerId: "claude-cli",
972+
onAssistantDelta: () => undefined,
973+
onThinkingDelta: (delta) => thinking.push(delta),
974+
});
975+
976+
// Adaptive models can return redacted thinking as a single block at
977+
// content_block_start with the content already populated under
978+
// `thinking` (or `text`) and no following thinking_delta events.
979+
// Without the single-block detection the consumer would silently
980+
// drop this content.
981+
parser.push(
982+
[
983+
JSON.stringify({
984+
type: "stream_event",
985+
event: {
986+
type: "content_block_start",
987+
index: 0,
988+
content_block: { type: "thinking", thinking: "preformed thought block" },
989+
},
990+
}),
991+
JSON.stringify({
992+
type: "stream_event",
993+
event: { type: "content_block_stop", index: 0 },
994+
}),
995+
].join("\n") + "\n",
996+
);
997+
parser.finish();
998+
999+
expect(thinking).toEqual([
1000+
{ text: "preformed thought block", delta: "preformed thought block" },
1001+
]);
1002+
});
1003+
1004+
it("does not emit onThinkingDelta when a thinking block has no seed content and receives no deltas", () => {
1005+
const thinking: Array<CliThinkingDelta> = [];
1006+
const parser = createCliJsonlStreamingParser({
1007+
backend: {
1008+
command: "local-cli",
1009+
output: "jsonl",
1010+
jsonlDialect: "claude-stream-json",
1011+
sessionIdFields: ["session_id"],
1012+
},
1013+
providerId: "claude-cli",
1014+
onAssistantDelta: () => undefined,
1015+
onThinkingDelta: (delta) => thinking.push(delta),
1016+
});
1017+
1018+
// Encrypted-only redacted thinking: nothing useful to render, so
1019+
// emit nothing rather than spurious empty events.
1020+
parser.push(
1021+
[
1022+
JSON.stringify({
1023+
type: "stream_event",
1024+
event: {
1025+
type: "content_block_start",
1026+
index: 0,
1027+
content_block: { type: "thinking" },
1028+
},
1029+
}),
1030+
JSON.stringify({
1031+
type: "stream_event",
1032+
event: { type: "content_block_stop", index: 0 },
1033+
}),
1034+
].join("\n") + "\n",
1035+
);
1036+
parser.finish();
1037+
1038+
expect(thinking).toEqual([]);
1039+
});
8951040
});

src/agents/cli-output.ts

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ export type CliToolResultDelta = {
4040
result?: unknown;
4141
};
4242

43+
export type CliThinkingDelta = {
44+
text: string;
45+
delta: string;
46+
};
47+
4348
function isClaudeCliProvider(providerId: string): boolean {
4449
return normalizeLowercaseStringOrEmpty(providerId) === "claude-cli";
4550
}
@@ -609,12 +614,121 @@ function dispatchClaudeCliStreamingToolEvent(params: {
609614
}
610615
}
611616

617+
type PendingThinkingBlock = {
618+
// Text accumulated from streaming thinking_delta chunks. Stays empty
619+
// when the block is the single-block redacted shape and no chunks
620+
// arrive between content_block_start and content_block_stop.
621+
accumulated: string;
622+
// Whether any thinking_delta event was observed for this index. Drives
623+
// the single-block detection at content_block_stop time.
624+
receivedDelta: boolean;
625+
// Seed content captured from the content_block_start payload. On the
626+
// adaptive single-block path the API returns the block fully-formed at
627+
// start with the content under `text` or `thinking`; we surface it as
628+
// one event at content_block_stop when no streaming delta arrived.
629+
seedContent: string;
630+
};
631+
632+
type ThinkingTracker = {
633+
pendingByIndex: Map<number, PendingThinkingBlock>;
634+
};
635+
636+
function createThinkingTracker(): ThinkingTracker {
637+
return {
638+
pendingByIndex: new Map(),
639+
};
640+
}
641+
642+
function readThinkingSeedContent(block: Record<string, unknown>): string {
643+
if (typeof block.thinking === "string" && block.thinking) {
644+
return block.thinking;
645+
}
646+
if (typeof block.text === "string" && block.text) {
647+
return block.text;
648+
}
649+
return "";
650+
}
651+
652+
function dispatchClaudeCliStreamingThinkingEvent(params: {
653+
backend: CliBackendConfig;
654+
providerId: string;
655+
parsed: Record<string, unknown>;
656+
tracker: ThinkingTracker;
657+
onThinkingDelta?: (delta: CliThinkingDelta) => void;
658+
}): void {
659+
if (!params.onThinkingDelta) {
660+
return;
661+
}
662+
if (!usesClaudeStreamJsonDialect(params)) {
663+
return;
664+
}
665+
if (params.parsed.type !== "stream_event" || !isRecord(params.parsed.event)) {
666+
return;
667+
}
668+
const event = params.parsed.event;
669+
const tracker = params.tracker;
670+
671+
if (
672+
event.type === "content_block_start" &&
673+
typeof event.index === "number" &&
674+
isRecord(event.content_block) &&
675+
event.content_block.type === "thinking"
676+
) {
677+
tracker.pendingByIndex.set(event.index, {
678+
accumulated: "",
679+
receivedDelta: false,
680+
seedContent: readThinkingSeedContent(event.content_block),
681+
});
682+
return;
683+
}
684+
685+
if (
686+
event.type === "content_block_delta" &&
687+
typeof event.index === "number" &&
688+
isRecord(event.delta) &&
689+
event.delta.type === "thinking_delta" &&
690+
typeof event.delta.thinking === "string"
691+
) {
692+
const pending = tracker.pendingByIndex.get(event.index);
693+
if (!pending) {
694+
return;
695+
}
696+
const chunk = event.delta.thinking;
697+
if (!chunk) {
698+
return;
699+
}
700+
pending.receivedDelta = true;
701+
pending.accumulated = `${pending.accumulated}${chunk}`;
702+
params.onThinkingDelta({ text: pending.accumulated, delta: chunk });
703+
return;
704+
}
705+
706+
if (event.type === "content_block_stop" && typeof event.index === "number") {
707+
const pending = tracker.pendingByIndex.get(event.index);
708+
if (!pending) {
709+
return;
710+
}
711+
tracker.pendingByIndex.delete(event.index);
712+
if (pending.receivedDelta) {
713+
return;
714+
}
715+
if (!pending.seedContent) {
716+
// Redacted thinking with no surfaced content (encrypted-only blob,
717+
// for example). Nothing useful to render; drop silently rather than
718+
// emit an empty event.
719+
return;
720+
}
721+
params.onThinkingDelta({ text: pending.seedContent, delta: pending.seedContent });
722+
}
723+
}
724+
612725
export function createCliJsonlStreamingParser(params: {
613726
backend: CliBackendConfig;
614727
providerId: string;
615728
onAssistantDelta: (delta: CliStreamingDelta) => void;
616729
onToolUseStart?: (delta: CliToolUseStartDelta) => void;
617730
onToolResult?: (delta: CliToolResultDelta) => void;
731+
onThinkingDelta?: (delta: CliThinkingDelta) => void;
618732
}) {
619733
let lineBuffer = "";
620734
let assistantText = "";
@@ -623,6 +737,7 @@ export function createCliJsonlStreamingParser(params: {
623737
let output: CliOutput | null = null;
624738
const texts: string[] = [];
625739
const toolTracker = createToolUseTracker();
740+
const thinkingTracker = createThinkingTracker();
626741

627742
const handleParsedRecord = (parsed: Record<string, unknown>) => {
628743
sessionId = pickCliSessionId(parsed, params.backend) ?? sessionId;
@@ -671,6 +786,16 @@ export function createCliJsonlStreamingParser(params: {
671786
});
672787
}
673788

789+
if (params.onThinkingDelta) {
790+
dispatchClaudeCliStreamingThinkingEvent({
791+
backend: params.backend,
792+
providerId: params.providerId,
793+
parsed,
794+
tracker: thinkingTracker,
795+
onThinkingDelta: params.onThinkingDelta,
796+
});
797+
}
798+
674799
const delta = parseClaudeCliStreamingDelta({
675800
backend: params.backend,
676801
providerId: params.providerId,

0 commit comments

Comments
 (0)