Skip to content

Commit 1a13c34

Browse files
committed
fix(agents): close cache boundary transport gaps
1 parent 58a56d9 commit 1a13c34

7 files changed

Lines changed: 257 additions & 5 deletions

src/agents/anthropic-vertex-stream.test.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { Model } from "@mariozechner/pi-ai";
22
import { beforeEach, describe, expect, it, vi } from "vitest";
3+
import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js";
34

45
const hoisted = vi.hoisted(() => {
56
const streamAnthropicMock = vi.fn<(model: unknown, context: unknown, options: unknown) => symbol>(
@@ -145,6 +146,94 @@ describe("createAnthropicVertexStreamFn", () => {
145146
);
146147
});
147148

149+
it("applies Anthropic cache-boundary shaping before forwarding payload hooks", async () => {
150+
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
151+
const model = makeModel({ id: "claude-sonnet-4-6", maxTokens: 64000 });
152+
const onPayload = vi.fn(async (payload: unknown) => payload);
153+
154+
void streamFn(
155+
model,
156+
{
157+
systemPrompt: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`,
158+
messages: [{ role: "user", content: "Hello" }],
159+
} as never,
160+
{
161+
cacheRetention: "short",
162+
onPayload,
163+
} as never,
164+
);
165+
166+
const transportOptions = hoisted.streamAnthropicMock.mock.calls[0]?.[2] as {
167+
onPayload?: (payload: unknown, payloadModel: unknown) => Promise<unknown>;
168+
};
169+
const payload = {
170+
system: [
171+
{
172+
type: "text",
173+
text: `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`,
174+
cache_control: { type: "ephemeral" },
175+
},
176+
],
177+
messages: [{ role: "user", content: "Hello" }],
178+
};
179+
180+
const nextPayload = await transportOptions.onPayload?.(payload, model);
181+
182+
expect(onPayload).toHaveBeenCalledWith(
183+
{
184+
system: [
185+
{
186+
type: "text",
187+
text: "Stable prefix",
188+
cache_control: { type: "ephemeral" },
189+
},
190+
{
191+
type: "text",
192+
text: "Dynamic suffix",
193+
},
194+
],
195+
messages: [
196+
{
197+
role: "user",
198+
content: [
199+
{
200+
type: "text",
201+
text: "Hello",
202+
cache_control: { type: "ephemeral" },
203+
},
204+
],
205+
},
206+
],
207+
},
208+
model,
209+
);
210+
expect(nextPayload).toEqual({
211+
system: [
212+
{
213+
type: "text",
214+
text: "Stable prefix",
215+
cache_control: { type: "ephemeral" },
216+
},
217+
{
218+
type: "text",
219+
text: "Dynamic suffix",
220+
},
221+
],
222+
messages: [
223+
{
224+
role: "user",
225+
content: [
226+
{
227+
type: "text",
228+
text: "Hello",
229+
cache_control: { type: "ephemeral" },
230+
},
231+
],
232+
},
233+
],
234+
});
235+
});
236+
148237
it("omits maxTokens when neither the model nor request provide a finite limit", () => {
149238
const streamFn = createAnthropicVertexStreamFn("vertex-project", "us-east5");
150239
const model = makeModel({ id: "claude-sonnet-4-6" });

src/agents/anthropic-vertex-stream.ts

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import {
55
resolveAnthropicVertexClientRegion,
66
resolveAnthropicVertexProjectId,
77
} from "../plugin-sdk/anthropic-vertex.js";
8+
import {
9+
applyAnthropicPayloadPolicyToParams,
10+
resolveAnthropicPayloadPolicy,
11+
} from "./anthropic-payload-policy.js";
812

913
type AnthropicVertexEffort = NonNullable<AnthropicOptions["effort"]>;
1014

@@ -31,6 +35,28 @@ function resolveAnthropicVertexMaxTokens(params: {
3135
return requested ?? modelMax;
3236
}
3337

38+
function createAnthropicVertexOnPayload(params: {
39+
model: { api: string; baseUrl?: string; provider: string };
40+
cacheRetention: AnthropicOptions["cacheRetention"] | undefined;
41+
onPayload: AnthropicOptions["onPayload"] | undefined;
42+
}): NonNullable<AnthropicOptions["onPayload"]> {
43+
const policy = resolveAnthropicPayloadPolicy({
44+
provider: params.model.provider,
45+
api: params.model.api,
46+
baseUrl: params.model.baseUrl,
47+
cacheRetention: params.cacheRetention,
48+
enableCacheControl: true,
49+
});
50+
51+
return async (payload, model) => {
52+
if (payload && typeof payload === "object" && !Array.isArray(payload)) {
53+
applyAnthropicPayloadPolicyToParams(payload as Record<string, unknown>, policy);
54+
}
55+
const nextPayload = await params.onPayload?.(payload, model);
56+
return nextPayload ?? payload;
57+
};
58+
}
59+
3460
/**
3561
* Create a StreamFn that routes through pi-ai's `streamAnthropic` with an
3662
* injected `AnthropicVertex` client. All streaming, message conversion, and
@@ -49,8 +75,13 @@ export function createAnthropicVertexStreamFn(
4975
});
5076

5177
return (model, context, options) => {
78+
const transportModel = model as Model<"anthropic-messages"> & {
79+
api: string;
80+
baseUrl?: string;
81+
provider: string;
82+
};
5283
const maxTokens = resolveAnthropicVertexMaxTokens({
53-
modelMaxTokens: model.maxTokens,
84+
modelMaxTokens: transportModel.maxTokens,
5485
requestedMaxTokens: options?.maxTokens,
5586
});
5687
const opts: AnthropicOptions = {
@@ -61,7 +92,11 @@ export function createAnthropicVertexStreamFn(
6192
cacheRetention: options?.cacheRetention,
6293
sessionId: options?.sessionId,
6394
headers: options?.headers,
64-
onPayload: options?.onPayload,
95+
onPayload: createAnthropicVertexOnPayload({
96+
model: transportModel,
97+
cacheRetention: options?.cacheRetention,
98+
onPayload: options?.onPayload,
99+
}),
65100
maxRetryDelayMs: options?.maxRetryDelayMs,
66101
metadata: options?.metadata,
67102
};
@@ -95,7 +130,7 @@ export function createAnthropicVertexStreamFn(
95130
opts.thinkingEnabled = false;
96131
}
97132

98-
return streamAnthropic(model as Model<"anthropic-messages">, context, opts);
133+
return streamAnthropic(transportModel, context, opts);
99134
};
100135
}
101136

src/agents/cache-trace.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,49 @@ describe("createCacheTrace", () => {
6969
expect(event.system).toBe("");
7070
});
7171

72+
it("records stream context from systemPrompt when wrapping stream functions", () => {
73+
const lines: string[] = [];
74+
const trace = createCacheTrace({
75+
cfg: {
76+
diagnostics: {
77+
cacheTrace: {
78+
enabled: true,
79+
includeSystem: true,
80+
},
81+
},
82+
},
83+
env: {},
84+
writer: {
85+
filePath: "memory",
86+
write: (line) => lines.push(line),
87+
},
88+
});
89+
90+
const wrapped = trace?.wrapStreamFn(((model, context, options) => ({
91+
model,
92+
context,
93+
options,
94+
})) as never);
95+
96+
wrapped?.(
97+
{
98+
id: "gpt-5.4",
99+
provider: "openai",
100+
api: "openai-responses",
101+
} as never,
102+
{
103+
systemPrompt: "system prompt text",
104+
messages: [],
105+
} as never,
106+
{},
107+
);
108+
109+
const event = JSON.parse(lines[0]?.trim() ?? "{}") as Record<string, unknown>;
110+
expect(event.stage).toBe("stream:context");
111+
expect(event.system).toBe("system prompt text");
112+
expect(event.systemDigest).toBeTypeOf("string");
113+
});
114+
72115
it("respects env overrides for enablement", () => {
73116
const lines: string[] = [];
74117
const trace = createCacheTrace({

src/agents/cache-trace.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,14 +238,19 @@ export function createCacheTrace(params: CacheTraceInit): CacheTrace | null {
238238

239239
const wrapStreamFn: CacheTrace["wrapStreamFn"] = (streamFn) => {
240240
const wrapped: StreamFn = (model, context, options) => {
241+
const traceContext = context as {
242+
messages?: AgentMessage[];
243+
system?: unknown;
244+
systemPrompt?: unknown;
245+
};
241246
recordStage("stream:context", {
242247
model: {
243248
id: model?.id,
244249
provider: model?.provider,
245250
api: model?.api,
246251
},
247-
system: (context as { system?: unknown }).system,
248-
messages: (context as { messages?: AgentMessage[] }).messages ?? [],
252+
system: traceContext.systemPrompt ?? traceContext.system,
253+
messages: traceContext.messages ?? [],
249254
options: (options ?? {}) as Record<string, unknown>,
250255
});
251256
return streamFn(model, context, options);

src/agents/openai-transport-stream.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"
2020
describe("openai transport stream", () => {
2121
it("reports the supported transport-aware APIs", () => {
2222
expect(isTransportAwareApiSupported("openai-responses")).toBe(true);
23+
expect(isTransportAwareApiSupported("openai-codex-responses")).toBe(true);
2324
expect(isTransportAwareApiSupported("openai-completions")).toBe(true);
2425
expect(isTransportAwareApiSupported("azure-openai-responses")).toBe(true);
2526
expect(isTransportAwareApiSupported("anthropic-messages")).toBe(true);
@@ -41,6 +42,20 @@ describe("openai transport stream", () => {
4142
maxTokens: 8192,
4243
} satisfies Model<"openai-responses">),
4344
).toBeTypeOf("function");
45+
expect(
46+
createBoundaryAwareStreamFnForModel({
47+
id: "codex-mini-latest",
48+
name: "Codex Mini Latest",
49+
api: "openai-codex-responses",
50+
provider: "openai-codex",
51+
baseUrl: "https://api.openai.com/v1",
52+
reasoning: true,
53+
input: ["text"],
54+
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
55+
contextWindow: 200000,
56+
maxTokens: 8192,
57+
} satisfies Model<"openai-codex-responses">),
58+
).toBeTypeOf("function");
4459
expect(
4560
createBoundaryAwareStreamFnForModel({
4661
id: "claude-sonnet-4-6",
@@ -104,6 +119,39 @@ describe("openai transport stream", () => {
104119
expect(buildTransportAwareSimpleStreamFn(model)).toBeTypeOf("function");
105120
});
106121

122+
it("prepares a Codex Responses simple-completion api alias when transport overrides are attached", () => {
123+
const model = attachModelProviderRequestTransport(
124+
{
125+
id: "codex-mini-latest",
126+
name: "Codex Mini Latest",
127+
api: "openai-codex-responses",
128+
provider: "openai-codex",
129+
baseUrl: "https://api.openai.com/v1",
130+
reasoning: true,
131+
input: ["text"],
132+
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
133+
contextWindow: 200000,
134+
maxTokens: 8192,
135+
} satisfies Model<"openai-codex-responses">,
136+
{
137+
proxy: {
138+
mode: "explicit-proxy",
139+
url: "http://proxy.internal:8443",
140+
},
141+
},
142+
);
143+
144+
const prepared = prepareTransportAwareSimpleModel(model);
145+
146+
expect(resolveTransportAwareSimpleApi(model.api)).toBe("openclaw-openai-responses-transport");
147+
expect(prepared).toMatchObject({
148+
api: "openclaw-openai-responses-transport",
149+
provider: "openai-codex",
150+
id: "codex-mini-latest",
151+
});
152+
expect(buildTransportAwareSimpleStreamFn(model)).toBeTypeOf("function");
153+
});
154+
107155
it("prepares an Anthropic simple-completion api alias when transport overrides are attached", () => {
108156
const model = attachModelProviderRequestTransport(
109157
{

src/agents/pi-embedded-runner/stream-resolution.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,20 @@ describe("describeEmbeddedAgentStreamStrategy", () => {
3535
).toBe("boundary-aware:openai-responses");
3636
});
3737

38+
it("describes default Codex fallback shaping", () => {
39+
expect(
40+
describeEmbeddedAgentStreamStrategy({
41+
currentStreamFn: undefined,
42+
shouldUseWebSocketTransport: false,
43+
model: {
44+
api: "openai-codex-responses",
45+
provider: "openai-codex",
46+
id: "codex-mini-latest",
47+
} as never,
48+
}),
49+
).toBe("boundary-aware:openai-codex-responses");
50+
});
51+
3852
it("keeps custom session streams labeled as custom", () => {
3953
expect(
4054
describeEmbeddedAgentStreamStrategy({
@@ -65,4 +79,19 @@ describe("resolveEmbeddedAgentStreamFn", () => {
6579

6680
expect(streamFn).not.toBe(streamSimple);
6781
});
82+
83+
it("routes Codex responses fallbacks through boundary-aware transports", () => {
84+
const streamFn = resolveEmbeddedAgentStreamFn({
85+
currentStreamFn: undefined,
86+
shouldUseWebSocketTransport: false,
87+
sessionId: "session-1",
88+
model: {
89+
api: "openai-codex-responses",
90+
provider: "openai-codex",
91+
id: "codex-mini-latest",
92+
} as never,
93+
});
94+
95+
expect(streamFn).not.toBe(streamSimple);
96+
});
6897
});

src/agents/provider-transport-stream.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { getModelProviderRequestTransport } from "./provider-request-config.js";
1111

1212
const SUPPORTED_TRANSPORT_APIS = new Set<Api>([
1313
"openai-responses",
14+
"openai-codex-responses",
1415
"openai-completions",
1516
"azure-openai-responses",
1617
"anthropic-messages",
@@ -19,6 +20,7 @@ const SUPPORTED_TRANSPORT_APIS = new Set<Api>([
1920

2021
const SIMPLE_TRANSPORT_API_ALIAS: Record<string, Api> = {
2122
"openai-responses": "openclaw-openai-responses-transport",
23+
"openai-codex-responses": "openclaw-openai-responses-transport",
2224
"openai-completions": "openclaw-openai-completions-transport",
2325
"azure-openai-responses": "openclaw-azure-openai-responses-transport",
2426
"anthropic-messages": "openclaw-anthropic-messages-transport",
@@ -28,6 +30,7 @@ const SIMPLE_TRANSPORT_API_ALIAS: Record<string, Api> = {
2830
function createSupportedTransportStreamFn(api: Api): StreamFn | undefined {
2931
switch (api) {
3032
case "openai-responses":
33+
case "openai-codex-responses":
3134
return createOpenAIResponsesTransportStreamFn();
3235
case "openai-completions":
3336
return createOpenAICompletionsTransportStreamFn();

0 commit comments

Comments
 (0)