Skip to content

Commit 46bba38

Browse files
committed
feat: promote provider tool call stream wrapper
1 parent 82bbcf6 commit 46bba38

9 files changed

Lines changed: 518 additions & 250 deletions

File tree

docs/plugins/sdk-provider-plugins.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ API key auth, and dynamic model resolution.
368368
Each family builder is composed from lower-level public helpers exported from the same package, which you can reach for when a provider needs to go off the common pattern:
369369

370370
- `openclaw/plugin-sdk/provider-model-shared` - `ProviderReplayFamily`, `buildProviderReplayFamilyHooks(...)`, and the raw replay builders (`buildOpenAICompatibleReplayPolicy`, `buildAnthropicReplayPolicyForModel`, `buildGoogleGeminiReplayPolicy`, `buildHybridAnthropicOrOpenAIReplayPolicy`). Also exports Gemini replay helpers (`sanitizeGoogleGeminiReplayHistory`, `resolveTaggedReasoningOutputMode`) and endpoint/model helpers (`resolveProviderEndpoint`, `normalizeProviderId`, `normalizeGooglePreviewModelId`).
371-
- `openclaw/plugin-sdk/provider-stream` - `ProviderStreamFamily`, `buildProviderStreamFamilyHooks(...)`, `composeProviderStreamWrappers(...)`, plus the shared OpenAI/Codex wrappers (`createOpenAIAttributionHeadersWrapper`, `createOpenAIFastModeWrapper`, `createOpenAIServiceTierWrapper`, `createOpenAIResponsesContextManagementWrapper`, `createCodexNativeWebSearchWrapper`), DeepSeek V4 OpenAI-compatible wrapper (`createDeepSeekV4OpenAICompatibleThinkingWrapper`), Anthropic Messages thinking prefill cleanup (`createAnthropicThinkingPrefillPayloadWrapper`), and shared proxy/provider wrappers (`createOpenRouterWrapper`, `createToolStreamWrapper`, `createMinimaxFastModeWrapper`).
371+
- `openclaw/plugin-sdk/provider-stream` - `ProviderStreamFamily`, `buildProviderStreamFamilyHooks(...)`, `composeProviderStreamWrappers(...)`, plus the shared OpenAI/Codex wrappers (`createOpenAIAttributionHeadersWrapper`, `createOpenAIFastModeWrapper`, `createOpenAIServiceTierWrapper`, `createOpenAIResponsesContextManagementWrapper`, `createCodexNativeWebSearchWrapper`), DeepSeek V4 OpenAI-compatible wrapper (`createDeepSeekV4OpenAICompatibleThinkingWrapper`), Anthropic Messages thinking prefill cleanup (`createAnthropicThinkingPrefillPayloadWrapper`), plain-text tool-call promotion (`createPlainTextToolCallPromotionWrapper`), and shared proxy/provider wrappers (`createOpenRouterWrapper`, `createToolStreamWrapper`, `createMinimaxFastModeWrapper`).
372372
- `openclaw/plugin-sdk/provider-tools` - `ProviderToolCompatFamily`, `buildProviderToolCompatFamilyHooks("deepseek" | "gemini" | "openai")`, and underlying provider schema helpers.
373373

374374
Some stream helpers stay provider-local on purpose. `@openclaw/anthropic-provider` keeps `wrapAnthropicProviderStream`, `resolveAnthropicBetas`, `resolveAnthropicFastMode`, `resolveAnthropicServiceTier`, and the lower-level Anthropic wrapper builders in its own public `api.ts` / `contract-api.ts` seam because they encode Claude OAuth beta handling and `context1m` gating. The xAI plugin similarly keeps native xAI Responses shaping in its own `wrapStreamFn` (`/fast` aliases, default `tool_stream`, unsupported strict-tool cleanup, xAI-specific reasoning-payload removal).

docs/plugins/sdk-subpaths.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ focused channel/runtime subpaths, `config-contracts`, `string-coerce-runtime`,
179179
| `plugin-sdk/embedding-providers` | General embedding provider types and read helpers, including `EmbeddingProviderAdapter`, `getEmbeddingProvider(...)`, and `listEmbeddingProviders(...)`; plugins register providers through `api.registerEmbeddingProvider(...)` so manifest ownership is enforced |
180180
| `plugin-sdk/provider-tools` | `ProviderToolCompatFamily`, `buildProviderToolCompatFamilyHooks`, and DeepSeek/Gemini/OpenAI schema cleanup + diagnostics |
181181
| `plugin-sdk/provider-usage` | `fetchClaudeUsage` and similar |
182-
| `plugin-sdk/provider-stream` | `ProviderStreamFamily`, `buildProviderStreamFamilyHooks`, `composeProviderStreamWrappers`, stream wrapper types, and shared Anthropic/Bedrock/DeepSeek V4/Google/Kilocode/Moonshot/OpenAI/OpenRouter/Z.A.I/MiniMax/Copilot wrapper helpers |
182+
| `plugin-sdk/provider-stream` | `ProviderStreamFamily`, `buildProviderStreamFamilyHooks`, `composeProviderStreamWrappers`, stream wrapper types, plain-text tool-call promotion, and shared Anthropic/Bedrock/DeepSeek V4/Google/Kilocode/Moonshot/OpenAI/OpenRouter/Z.A.I/MiniMax/Copilot wrapper helpers |
183+
| `plugin-sdk/provider-stream-shared` | Public shared provider stream wrapper helpers including `composeProviderStreamWrappers`, `createPlainTextToolCallPromotionWrapper`, `createPayloadPatchStreamWrapper`, `createToolStreamWrapper`, and Anthropic/DeepSeek/OpenAI-compatible stream utilities |
183184
| `plugin-sdk/provider-transport-runtime` | Native provider transport helpers such as guarded fetch, transport message transforms, and writable transport event streams |
184185
| `plugin-sdk/provider-onboard` | Onboarding config patch helpers |
185186
| `plugin-sdk/global-singleton` | Process-local singleton/map/cache helpers |

extensions/lmstudio/src/stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import type { StreamFn } from "@earendil-works/pi-agent-core";
22
import { streamSimple } from "@earendil-works/pi-ai";
33
import { createSubsystemLogger } from "openclaw/plugin-sdk/logging-core";
44
import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry";
5-
import { createPlainTextToolCallPromotionWrapper } from "openclaw/plugin-sdk/provider-stream-runtime-internal";
5+
import { createPlainTextToolCallPromotionWrapper } from "openclaw/plugin-sdk/provider-stream-shared";
66
import { ssrfPolicyFromHttpBaseUrlAllowedHostname } from "openclaw/plugin-sdk/ssrf-runtime";
77
import { LMSTUDIO_PROVIDER_ID } from "./defaults.js";
88
import { ensureLmstudioModelLoaded } from "./models.fetch.js";

extensions/xai/stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import type { StreamFn } from "@earendil-works/pi-agent-core";
22
import { streamSimple } from "@earendil-works/pi-ai";
33
import type { ProviderWrapStreamFnContext } from "openclaw/plugin-sdk/plugin-entry";
4-
import { createPlainTextToolCallPromotionWrapper } from "openclaw/plugin-sdk/provider-stream-runtime-internal";
54
import {
65
composeProviderStreamWrappers,
6+
createPlainTextToolCallPromotionWrapper,
77
createToolStreamWrapper,
88
} from "openclaw/plugin-sdk/provider-stream-shared";
99

Lines changed: 3 additions & 245 deletions
Original file line numberDiff line numberDiff line change
@@ -1,247 +1,5 @@
1-
import { randomUUID } from "node:crypto";
2-
import type { StreamFn } from "@earendil-works/pi-agent-core";
3-
import { createAssistantMessageEventStream, streamSimple } from "@earendil-works/pi-ai";
4-
import { parseStandalonePlainTextToolCallBlocks } from "./tool-payload.js";
5-
6-
function toRecord(value: unknown): Record<string, unknown> | undefined {
7-
return value && typeof value === "object" ? (value as Record<string, unknown>) : undefined;
8-
}
9-
10-
function resolveContextToolNames(context: Parameters<StreamFn>[1]): Set<string> {
11-
const tools = (context as { tools?: unknown }).tools;
12-
if (!Array.isArray(tools)) {
13-
return new Set();
14-
}
15-
const names = tools
16-
.map((tool) => {
17-
const record = toRecord(tool);
18-
return typeof record?.name === "string" && record.name.trim() ? record.name : undefined;
19-
})
20-
.filter((name): name is string => Boolean(name));
21-
return new Set(names);
22-
}
23-
24-
function couldStillBePlainTextToolCall(text: string): boolean {
25-
if (text.length > 256_000) {
26-
return false;
27-
}
28-
const trimmed = text.trimStart();
29-
return (
30-
trimmed.length === 0 ||
31-
trimmed.startsWith("[") ||
32-
trimmed.startsWith("<|channel|>") ||
33-
trimmed.startsWith("commentary") ||
34-
trimmed.startsWith("analysis") ||
35-
trimmed.startsWith("final")
36-
);
37-
}
38-
39-
function createSyntheticToolCallId(): string {
40-
return `call_${randomUUID().replace(/-/g, "").slice(0, 24)}`;
41-
}
42-
43-
function createPlainTextToolCallBlock(parsed: {
44-
arguments: Record<string, unknown>;
45-
name: string;
46-
}): Record<string, unknown> {
47-
return {
48-
type: "toolCall",
49-
id: createSyntheticToolCallId(),
50-
name: parsed.name,
51-
arguments: parsed.arguments,
52-
partialArgs: JSON.stringify(parsed.arguments),
53-
};
54-
}
55-
56-
function promotePlainTextToolCalls(
57-
message: unknown,
58-
toolNames: Set<string>,
59-
): Record<string, unknown> | undefined {
60-
const messageRecord = toRecord(message);
61-
if (!messageRecord) {
62-
return undefined;
63-
}
64-
if (!Array.isArray(messageRecord.content)) {
65-
if (typeof messageRecord.content !== "string" || !messageRecord.content.trim()) {
66-
return undefined;
67-
}
68-
const parsed = parseStandalonePlainTextToolCallBlocks(messageRecord.content, {
69-
allowedToolNames: toolNames,
70-
});
71-
if (!parsed) {
72-
return undefined;
73-
}
74-
return {
75-
...messageRecord,
76-
content: parsed.map(createPlainTextToolCallBlock),
77-
stopReason: "toolUse",
78-
};
79-
}
80-
if (
81-
messageRecord.content.some((block) => toRecord(block)?.type === "toolCall") ||
82-
messageRecord.content.length === 0
83-
) {
84-
return undefined;
85-
}
86-
87-
let promoted = false;
88-
const nextContent: Array<Record<string, unknown>> = [];
89-
for (const block of messageRecord.content) {
90-
const blockRecord = toRecord(block);
91-
if (!blockRecord) {
92-
return undefined;
93-
}
94-
if (blockRecord.type !== "text") {
95-
nextContent.push(blockRecord);
96-
continue;
97-
}
98-
const text = typeof blockRecord.text === "string" ? blockRecord.text : "";
99-
if (!text.trim()) {
100-
continue;
101-
}
102-
const parsed = parseStandalonePlainTextToolCallBlocks(text, {
103-
allowedToolNames: toolNames,
104-
});
105-
if (!parsed) {
106-
return undefined;
107-
}
108-
nextContent.push(...parsed.map(createPlainTextToolCallBlock));
109-
promoted = true;
110-
}
111-
112-
if (!promoted) {
113-
return undefined;
114-
}
115-
return {
116-
...messageRecord,
117-
content: nextContent,
118-
stopReason: "toolUse",
119-
};
120-
}
121-
122-
function emitPromotedToolCallEvents(
123-
stream: { push(event: unknown): void },
124-
message: Record<string, unknown>,
125-
): void {
126-
const content = Array.isArray(message.content) ? message.content : [];
127-
content.forEach((block, contentIndex) => {
128-
const record = toRecord(block);
129-
if (record?.type !== "toolCall") {
130-
return;
131-
}
132-
stream.push({ type: "toolcall_start", contentIndex, partial: message });
133-
stream.push({
134-
type: "toolcall_delta",
135-
contentIndex,
136-
delta: typeof record.partialArgs === "string" ? record.partialArgs : "{}",
137-
partial: message,
138-
});
139-
});
140-
}
141-
142-
function wrapPlainTextToolCallStream(
143-
source: ReturnType<StreamFn>,
144-
context: Parameters<StreamFn>[1],
145-
): ReturnType<StreamFn> {
146-
const toolNames = resolveContextToolNames(context);
147-
if (toolNames.size === 0) {
148-
return source;
149-
}
150-
const output = createAssistantMessageEventStream();
151-
const stream = output as unknown as { push(event: unknown): void; end(): void };
152-
153-
void (async () => {
154-
const bufferedTextEvents: unknown[] = [];
155-
let bufferedText = "";
156-
let ended = false;
157-
const endStream = () => {
158-
if (!ended) {
159-
ended = true;
160-
stream.end();
161-
}
162-
};
163-
const flushBufferedTextEvents = () => {
164-
for (const event of bufferedTextEvents.splice(0)) {
165-
stream.push(event);
166-
}
167-
bufferedText = "";
168-
};
169-
170-
try {
171-
for await (const event of source as AsyncIterable<unknown>) {
172-
const record = toRecord(event);
173-
const type = typeof record?.type === "string" ? record.type : "";
174-
175-
if (type === "text_start" || type === "text_delta" || type === "text_end") {
176-
bufferedTextEvents.push(event);
177-
if (typeof record?.delta === "string") {
178-
bufferedText += record.delta;
179-
} else if (typeof record?.content === "string" && !bufferedText) {
180-
bufferedText = record.content;
181-
}
182-
if (!couldStillBePlainTextToolCall(bufferedText)) {
183-
flushBufferedTextEvents();
184-
}
185-
continue;
186-
}
187-
188-
if (type === "done") {
189-
const promotedMessage = promotePlainTextToolCalls(record?.message, toolNames);
190-
if (promotedMessage) {
191-
bufferedTextEvents.splice(0);
192-
bufferedText = "";
193-
emitPromotedToolCallEvents(stream, promotedMessage);
194-
stream.push({ ...record, reason: "toolUse", message: promotedMessage });
195-
} else {
196-
flushBufferedTextEvents();
197-
stream.push(event);
198-
}
199-
endStream();
200-
return;
201-
}
202-
203-
flushBufferedTextEvents();
204-
stream.push(event);
205-
if (type === "error") {
206-
endStream();
207-
return;
208-
}
209-
}
210-
flushBufferedTextEvents();
211-
} catch (error) {
212-
stream.push({
213-
type: "error",
214-
reason: "error",
215-
error: {
216-
role: "assistant",
217-
content: [],
218-
stopReason: "error",
219-
errorMessage: error instanceof Error ? error.message : String(error),
220-
},
221-
});
222-
} finally {
223-
endStream();
224-
}
225-
})();
226-
227-
return output as ReturnType<StreamFn>;
228-
}
229-
2301
/**
231-
* Bundled-provider runtime hygiene for providers that can leak tool-use syntax
232-
* as assistant text even when native tool calling is enabled.
2+
* @deprecated Use `createPlainTextToolCallPromotionWrapper` from
3+
* `openclaw/plugin-sdk/provider-stream-shared`.
2334
*/
234-
export function createPlainTextToolCallPromotionWrapper(
235-
baseStreamFn: StreamFn | undefined,
236-
): StreamFn {
237-
const underlying = baseStreamFn ?? streamSimple;
238-
return (model, context, options) => {
239-
const maybeStream = underlying(model, context, options);
240-
if (maybeStream && typeof maybeStream === "object" && "then" in maybeStream) {
241-
return Promise.resolve(maybeStream).then((stream) =>
242-
wrapPlainTextToolCallStream(stream, context),
243-
) as ReturnType<StreamFn>;
244-
}
245-
return wrapPlainTextToolCallStream(maybeStream, context);
246-
};
247-
}
5+
export { createPlainTextToolCallPromotionWrapper } from "./provider-stream-shared.js";

src/plugin-sdk/provider-stream-shared.test.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,28 @@
11
import type { StreamFn } from "@earendil-works/pi-agent-core";
2+
import { createAssistantMessageEventStream } from "@earendil-works/pi-ai";
23
import { describe, expect, it } from "vitest";
34
import {
45
createDeepSeekV4OpenAICompatibleThinkingWrapper,
56
createAnthropicThinkingPrefillPayloadWrapper,
67
createPayloadPatchStreamWrapper,
8+
createPlainTextToolCallPromotionWrapper,
79
defaultToolStreamExtraParams,
810
isOpenAICompatibleThinkingEnabled,
911
stripTrailingAnthropicAssistantPrefillWhenThinking,
1012
} from "./provider-stream-shared.js";
1113

14+
function createEventStream(events: unknown[]): ReturnType<StreamFn> {
15+
const output = createAssistantMessageEventStream();
16+
const stream = output as unknown as { push(event: unknown): void; end(): void };
17+
queueMicrotask(() => {
18+
for (const event of events) {
19+
stream.push(event);
20+
}
21+
stream.end();
22+
});
23+
return output as ReturnType<StreamFn>;
24+
}
25+
1226
describe("defaultToolStreamExtraParams", () => {
1327
it("defaults tool_stream on when absent", () => {
1428
expect(defaultToolStreamExtraParams()).toEqual({ tool_stream: true });
@@ -139,6 +153,80 @@ describe("createPayloadPatchStreamWrapper", () => {
139153
});
140154
});
141155

156+
describe("createPlainTextToolCallPromotionWrapper", () => {
157+
it("promotes standalone text tool calls into tool-call stream events", async () => {
158+
const baseStreamFn: StreamFn = () =>
159+
createEventStream([
160+
{ type: "text_start", content: "" },
161+
{ type: "text_delta", delta: '[tool:read] {"path":"/tmp/file.txt"}' },
162+
{ type: "text_end" },
163+
{
164+
type: "done",
165+
reason: "stop",
166+
message: {
167+
role: "assistant",
168+
content: '[tool:read] {"path":"/tmp/file.txt"}',
169+
},
170+
},
171+
]);
172+
const wrapped = createPlainTextToolCallPromotionWrapper(baseStreamFn);
173+
const events: unknown[] = [];
174+
175+
for await (const event of wrapped(
176+
{} as never,
177+
{ tools: [{ name: "read" }] } as never,
178+
{},
179+
) as AsyncIterable<unknown>) {
180+
events.push(event);
181+
}
182+
183+
expect(events.map((event) => (event as { type?: string }).type)).toEqual([
184+
"toolcall_start",
185+
"toolcall_delta",
186+
"done",
187+
]);
188+
const done = events.at(-1) as { message?: { content?: unknown; stopReason?: unknown } };
189+
expect(done.message?.stopReason).toBe("toolUse");
190+
expect(done.message?.content).toEqual([
191+
expect.objectContaining({
192+
type: "toolCall",
193+
name: "read",
194+
arguments: { path: "/tmp/file.txt" },
195+
}),
196+
]);
197+
});
198+
199+
it("passes through bracketed text when no configured tool names match", async () => {
200+
const baseStreamFn: StreamFn = () =>
201+
createEventStream([
202+
{ type: "text_delta", delta: "[note] keep streaming" },
203+
{
204+
type: "done",
205+
reason: "stop",
206+
message: {
207+
role: "assistant",
208+
content: "[note] keep streaming",
209+
},
210+
},
211+
]);
212+
const wrapped = createPlainTextToolCallPromotionWrapper(baseStreamFn);
213+
const events: unknown[] = [];
214+
215+
for await (const event of wrapped(
216+
{} as never,
217+
{ tools: [{ name: "read" }] } as never,
218+
{},
219+
) as AsyncIterable<unknown>) {
220+
events.push(event);
221+
}
222+
223+
expect(events.map((event) => (event as { type?: string }).type)).toEqual([
224+
"text_delta",
225+
"done",
226+
]);
227+
});
228+
});
229+
142230
describe("stripTrailingAnthropicAssistantPrefillWhenThinking", () => {
143231
it("removes trailing assistant text turns when Anthropic thinking is enabled", () => {
144232
const payload = {

0 commit comments

Comments
 (0)