Skip to content

Commit 912fdfb

Browse files
committed
fix(providers): stream ordinary tool-like prose promptly
1 parent 82bbcf6 commit 912fdfb

3 files changed

Lines changed: 341 additions & 7 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
1313
### Fixes
1414

1515
- Agents/media: preserve async-started media tool metadata so background generation starts no longer surface generic incomplete-turn warnings while replay stays unsafe. (#85933) Thanks @fuller-stack-dev.
16+
- xAI/LM Studio: avoid buffering ordinary bracketed or `final` prose until stream completion while watching for plain-text tool-call fallbacks.
1617
- Discord: suppress a bot's previous reply body and referenced media from prompt context when a user replies to that bot message, while keeping reply metadata for routing. (#86238) Thanks @fuller-stack-dev.
1718
- Tests: avoid rebuilding the Control UI twice during the installer Docker smoke now that `pnpm build` includes `ui:build`.
1819
- Install/update: bypass npm `min-release-age` policies with `--min-release-age=0` instead of `--before` so hosted installers keep working on npm versions that reject the combined config. (#84749) Thanks @TeodoroRodrigo.
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import type { StreamFn } from "@earendil-works/pi-agent-core";
2+
import { createAssistantMessageEventStream } from "@earendil-works/pi-ai";
3+
import { describe, expect, it } from "vitest";
4+
import { createPlainTextToolCallPromotionWrapper } from "./provider-stream-runtime-internal.js";
5+
6+
type StreamEvent = { type: string } & Record<string, unknown>;
7+
8+
function requireRecord(value: unknown, label: string): Record<string, unknown> {
9+
if (!value || typeof value !== "object" || Array.isArray(value)) {
10+
throw new Error(`expected ${label} to be a record`);
11+
}
12+
return value as Record<string, unknown>;
13+
}
14+
15+
function createControlledWrappedStream() {
16+
const source = createAssistantMessageEventStream();
17+
const baseStream: StreamFn = () => source as ReturnType<StreamFn>;
18+
const wrapped = createPlainTextToolCallPromotionWrapper(baseStream);
19+
const stream = wrapped(
20+
{ provider: "test", api: "openai-completions", id: "test-model" } as never,
21+
{
22+
messages: [],
23+
tools: [{ name: "read", description: "Read", parameters: { type: "object" } }],
24+
} as never,
25+
{},
26+
);
27+
return { source, stream };
28+
}
29+
30+
async function resolveStream(stream: ReturnType<StreamFn>) {
31+
return stream instanceof Promise ? await stream : stream;
32+
}
33+
34+
async function nextEvent(
35+
iterator: AsyncIterator<unknown>,
36+
label: string,
37+
): Promise<StreamEvent> {
38+
const result = await Promise.race([
39+
iterator.next(),
40+
new Promise<"timed out">((resolve) => setTimeout(() => resolve("timed out"), 50)),
41+
]);
42+
if (result === "timed out") {
43+
throw new Error(`timed out waiting for ${label}`);
44+
}
45+
expect(result.done).toBe(false);
46+
return result.value as StreamEvent;
47+
}
48+
49+
describe("createPlainTextToolCallPromotionWrapper", () => {
50+
it("promotes standalone plain-text tool calls for result consumers", async () => {
51+
const { source, stream } = createControlledWrappedStream();
52+
const resultPromise = (await resolveStream(stream)).result();
53+
const rawToolText = '[tool:read] {"path":"src/index.ts"}';
54+
55+
source.push({ type: "start", partial: { content: [] } } as never);
56+
source.push({
57+
type: "text_delta",
58+
contentIndex: 0,
59+
delta: rawToolText,
60+
} as never);
61+
source.push({
62+
type: "done",
63+
reason: "stop",
64+
message: {
65+
role: "assistant",
66+
content: [{ type: "text", text: rawToolText }],
67+
stopReason: "stop",
68+
},
69+
} as never);
70+
source.end();
71+
72+
const message = requireRecord(await resultPromise, "result message");
73+
expect(message.stopReason).toBe("toolUse");
74+
expect(requireRecord((message.content as unknown[])[0], "tool call")).toMatchObject({
75+
type: "toolCall",
76+
name: "read",
77+
arguments: { path: "src/index.ts" },
78+
});
79+
});
80+
81+
it("does not buffer ordinary bracketed text until done", async () => {
82+
const { source, stream } = createControlledWrappedStream();
83+
const iterator = (await resolveStream(stream))[Symbol.asyncIterator]();
84+
85+
try {
86+
source.push({ type: "start", partial: { content: [] } } as never);
87+
expect((await nextEvent(iterator, "start")).type).toBe("start");
88+
89+
source.push({
90+
type: "text_start",
91+
contentIndex: 0,
92+
partial: { content: [{ type: "text", text: "" }] },
93+
} as never);
94+
source.push({
95+
type: "text_delta",
96+
contentIndex: 0,
97+
delta: "[note] keep streaming",
98+
} as never);
99+
100+
expect((await nextEvent(iterator, "ordinary bracketed text")).type).toBe("text_start");
101+
} finally {
102+
source.push({ type: "done", reason: "stop", message: {} } as never);
103+
source.end();
104+
await iterator.return?.();
105+
}
106+
});
107+
108+
it("keeps CR-separated bracketed tool calls buffered for promotion", async () => {
109+
const { source, stream } = createControlledWrappedStream();
110+
const iterator = (await resolveStream(stream))[Symbol.asyncIterator]();
111+
112+
try {
113+
source.push({ type: "start", partial: { content: [] } } as never);
114+
expect((await nextEvent(iterator, "start")).type).toBe("start");
115+
116+
source.push({
117+
type: "text_delta",
118+
contentIndex: 0,
119+
delta: '[read]\r{"path":"src/index.ts"}\r[END_TOOL_REQUEST]',
120+
} as never);
121+
source.push({
122+
type: "done",
123+
reason: "stop",
124+
message: {
125+
role: "assistant",
126+
content: [
127+
{ type: "text", text: '[read]\r{"path":"src/index.ts"}\r[END_TOOL_REQUEST]' },
128+
],
129+
stopReason: "stop",
130+
},
131+
} as never);
132+
133+
const event = await nextEvent(iterator, "promoted CR tool call");
134+
expect(event.type).toBe("toolcall_start");
135+
} finally {
136+
source.end();
137+
await iterator.return?.();
138+
}
139+
});
140+
141+
it("does not buffer normal final prose until done", async () => {
142+
const { source, stream } = createControlledWrappedStream();
143+
const iterator = (await resolveStream(stream))[Symbol.asyncIterator]();
144+
145+
try {
146+
source.push({ type: "start", partial: { content: [] } } as never);
147+
expect((await nextEvent(iterator, "start")).type).toBe("start");
148+
149+
source.push({
150+
type: "text_delta",
151+
contentIndex: 0,
152+
delta: "final answer starts here",
153+
} as never);
154+
155+
const event = await nextEvent(iterator, "normal final prose");
156+
expect(event).toMatchObject({ type: "text_delta", delta: "final answer starts here" });
157+
} finally {
158+
source.push({ type: "done", reason: "stop", message: {} } as never);
159+
source.end();
160+
await iterator.return?.();
161+
}
162+
});
163+
});

src/plugin-sdk/provider-stream-runtime-internal.ts

Lines changed: 177 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,191 @@ function resolveContextToolNames(context: Parameters<StreamFn>[1]): Set<string>
2121
return new Set(names);
2222
}
2323

24-
function couldStillBePlainTextToolCall(text: string): boolean {
24+
function couldStillBePlainTextToolCall(text: string, toolNames: Set<string>): boolean {
2525
if (text.length > 256_000) {
2626
return false;
2727
}
2828
const trimmed = text.trimStart();
2929
return (
3030
trimmed.length === 0 ||
31-
trimmed.startsWith("[") ||
32-
trimmed.startsWith("<|channel|>") ||
33-
trimmed.startsWith("commentary") ||
34-
trimmed.startsWith("analysis") ||
35-
trimmed.startsWith("final")
31+
couldStillBeBracketedToolCall(trimmed, toolNames) ||
32+
couldStillBeHarmonyToolCall(trimmed, toolNames)
3633
);
3734
}
3835

36+
function matchesLiteralPrefix(text: string, literal: string): boolean {
37+
return literal.startsWith(text) || text.startsWith(literal);
38+
}
39+
40+
function skipHorizontalWhitespace(text: string, start: number): number {
41+
let cursor = start;
42+
while (text[cursor] === " " || text[cursor] === "\t") {
43+
cursor += 1;
44+
}
45+
return cursor;
46+
}
47+
48+
function isToolNameChar(char: string | undefined): boolean {
49+
return Boolean(char && /[A-Za-z0-9_-]/.test(char));
50+
}
51+
52+
function hasToolNamePrefix(toolNames: Set<string>, prefix: string): boolean {
53+
for (const toolName of toolNames) {
54+
if (toolName.startsWith(prefix)) {
55+
return true;
56+
}
57+
}
58+
return false;
59+
}
60+
61+
function couldStillBeJsonPayload(text: string, start: number): boolean {
62+
let cursor = start;
63+
while (cursor < text.length && /\s/.test(text[cursor] ?? "")) {
64+
cursor += 1;
65+
}
66+
return cursor >= text.length || text[cursor] === "{";
67+
}
68+
69+
function couldStillBeBracketedToolCall(text: string, toolNames: Set<string>): boolean {
70+
if (!text.startsWith("[")) {
71+
return false;
72+
}
73+
74+
const toolPrefix = "[tool:";
75+
if (matchesLiteralPrefix(text, toolPrefix)) {
76+
if (text.length <= toolPrefix.length) {
77+
return true;
78+
}
79+
let cursor = toolPrefix.length;
80+
while (isToolNameChar(text[cursor])) {
81+
cursor += 1;
82+
}
83+
const name = text.slice(toolPrefix.length, cursor);
84+
if (!name || !hasToolNamePrefix(toolNames, name)) {
85+
return false;
86+
}
87+
if (cursor >= text.length) {
88+
return true;
89+
}
90+
if (text[cursor] !== "]") {
91+
return false;
92+
}
93+
return couldStillBeJsonPayload(text, cursor + 1);
94+
}
95+
96+
let cursor = 1;
97+
while (isToolNameChar(text[cursor])) {
98+
cursor += 1;
99+
}
100+
const name = text.slice(1, cursor);
101+
if (!name || !hasToolNamePrefix(toolNames, name)) {
102+
return false;
103+
}
104+
if (cursor >= text.length) {
105+
return true;
106+
}
107+
if (text[cursor] !== "]") {
108+
return false;
109+
}
110+
111+
cursor = skipHorizontalWhitespace(text, cursor + 1);
112+
if (cursor >= text.length) {
113+
return true;
114+
}
115+
if (text[cursor] === "\r") {
116+
if (cursor + 1 >= text.length) {
117+
return true;
118+
}
119+
return couldStillBeJsonPayload(text, text[cursor + 1] === "\n" ? cursor + 2 : cursor + 1);
120+
}
121+
if (text[cursor] !== "\n") {
122+
return false;
123+
}
124+
return couldStillBeJsonPayload(text, cursor + 1);
125+
}
126+
127+
function couldStillBeHarmonyToolCall(text: string, toolNames: Set<string>): boolean {
128+
const channelMarker = "<|channel|>";
129+
let cursor = 0;
130+
if (matchesLiteralPrefix(text, channelMarker)) {
131+
if (text.length <= channelMarker.length) {
132+
return true;
133+
}
134+
cursor = channelMarker.length;
135+
}
136+
137+
const rest = text.slice(cursor);
138+
const channel = ["commentary", "analysis", "final"].find((candidate) =>
139+
matchesLiteralPrefix(rest, candidate),
140+
);
141+
if (!channel) {
142+
return false;
143+
}
144+
if (rest.length <= channel.length) {
145+
return true;
146+
}
147+
148+
cursor += channel.length;
149+
cursor = skipHorizontalWhitespace(text, cursor);
150+
if (cursor >= text.length) {
151+
return true;
152+
}
153+
154+
const toMarker = "to=";
155+
const toRest = text.slice(cursor);
156+
if (!matchesLiteralPrefix(toRest, toMarker)) {
157+
return false;
158+
}
159+
if (toRest.length <= toMarker.length) {
160+
return true;
161+
}
162+
163+
cursor += toMarker.length;
164+
const nameStart = cursor;
165+
while (isToolNameChar(text[cursor])) {
166+
cursor += 1;
167+
}
168+
const name = text.slice(nameStart, cursor);
169+
if (!name || !hasToolNamePrefix(toolNames, name)) {
170+
return false;
171+
}
172+
if (cursor >= text.length) {
173+
return true;
174+
}
175+
176+
cursor = skipHorizontalWhitespace(text, cursor);
177+
if (cursor >= text.length) {
178+
return true;
179+
}
180+
if (!toolNames.has(name)) {
181+
return false;
182+
}
183+
184+
const codeMarker = "code";
185+
const codeRest = text.slice(cursor);
186+
if (!matchesLiteralPrefix(codeRest, codeMarker)) {
187+
return false;
188+
}
189+
if (codeRest.length <= codeMarker.length) {
190+
return true;
191+
}
192+
193+
cursor += codeMarker.length;
194+
while (cursor < text.length && /\s/.test(text[cursor] ?? "")) {
195+
cursor += 1;
196+
}
197+
if (cursor >= text.length) {
198+
return true;
199+
}
200+
201+
const messageMarker = "<|message|>";
202+
const messageRest = text.slice(cursor);
203+
if (matchesLiteralPrefix(messageRest, messageMarker)) {
204+
return true;
205+
}
206+
return text[cursor] === "{";
207+
}
208+
39209
function createSyntheticToolCallId(): string {
40210
return `call_${randomUUID().replace(/-/g, "").slice(0, 24)}`;
41211
}
@@ -179,7 +349,7 @@ function wrapPlainTextToolCallStream(
179349
} else if (typeof record?.content === "string" && !bufferedText) {
180350
bufferedText = record.content;
181351
}
182-
if (!couldStillBePlainTextToolCall(bufferedText)) {
352+
if (!couldStillBePlainTextToolCall(bufferedText, toolNames)) {
183353
flushBufferedTextEvents();
184354
}
185355
continue;

0 commit comments

Comments
 (0)