Skip to content

Commit 0fc5a57

Browse files
openperfclawsweeper[bot]Takhoffman
authored
fix(anthropic-vertex): stop re-marking cache_control on transport-budgeted payloads (#92387)
Summary: - The PR removes the Anthropic Vertex adapter’s redundant cache-control payload-policy pass, forwards caller payload hooks unchanged, and adds regressions for preserving transport-budgeted payloads. - PR surface: Source -35, Tests -11. Total -46 across 2 files. - Reproducibility: yes. at source level. Current main reapplies cache policy to a finalized, fully budgeted pa ... ion logs show the corresponding five-marker rejection; this review did not run a live post-fix GCP request. Automerge notes: - No ClawSweeper repair was needed after automerge opt-in. Validation: - ClawSweeper review passed for head 6ef1960. - Required merge gates passed before the squash merge. Prepared head SHA: 6ef1960 Review: #92387 (comment) Co-authored-by: openperf <16864032@qq.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com> Approved-by: takhoffman Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com>
1 parent 1bd04ac commit 0fc5a57

2 files changed

Lines changed: 61 additions & 107 deletions

File tree

extensions/anthropic-vertex/stream-runtime.test.ts

Lines changed: 56 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ import { createAssistantMessageEventStream, type Model } from "openclaw/plugin-s
33
import { beforeAll, describe, expect, it, vi } from "vitest";
44
import type { AnthropicVertexStreamDeps } from "./stream-runtime.js";
55

6-
const SYSTEM_PROMPT_CACHE_BOUNDARY = "\n<!-- OPENCLAW_CACHE_BOUNDARY -->\n";
7-
86
function createStreamDeps(): {
97
deps: AnthropicVertexStreamDeps;
108
streamAnthropicMock: ReturnType<typeof vi.fn>;
@@ -50,8 +48,6 @@ function makeModel(params: {
5048
} as Model<"anthropic-messages">;
5149
}
5250

53-
const CACHE_BOUNDARY_PROMPT = `Stable prefix${SYSTEM_PROMPT_CACHE_BOUNDARY}Dynamic suffix`;
54-
5551
type PayloadHook = (payload: unknown, payloadModel: unknown) => Promise<unknown>;
5652

5753
function streamAnthropicCall(streamAnthropicMock: ReturnType<typeof vi.fn>): unknown[] {
@@ -72,8 +68,8 @@ function streamTransportOptions(
7268
return options as Record<string, unknown>;
7369
}
7470

75-
function captureCacheBoundaryPayloadHook(
76-
onPayload: PayloadHook,
71+
function captureTransportPayloadHook(
72+
onPayload: PayloadHook | undefined,
7773
deps: AnthropicVertexStreamDeps,
7874
streamAnthropicMock: ReturnType<typeof vi.fn>,
7975
) {
@@ -82,41 +78,39 @@ function captureCacheBoundaryPayloadHook(
8278

8379
void streamFn(
8480
model,
85-
{
86-
systemPrompt: CACHE_BOUNDARY_PROMPT,
87-
messages: [{ role: "user", content: "Hello" }],
88-
} as never,
89-
{
90-
cacheRetention: "short",
91-
onPayload,
92-
} as never,
81+
{ messages: [{ role: "user", content: "Hello" }] } as never,
82+
{ cacheRetention: "short", ...(onPayload ? { onPayload } : {}) } as never,
9383
);
9484

9585
const transportOptions = streamTransportOptions(streamAnthropicMock);
9686

9787
return { model, onPayload: transportOptions.onPayload as PayloadHook | undefined };
9888
}
9989

100-
function buildExpectedCacheBoundaryPayload(messageText: string) {
90+
// Mirrors the shared anthropic-messages transport output: cache boundary already
91+
// split (uncached dynamic suffix) and all four cache_control markers allocated.
92+
function buildBudgetedTransportPayload() {
10193
return {
10294
system: [
103-
{
104-
type: "text",
105-
text: "Stable prefix",
106-
cache_control: { type: "ephemeral" },
107-
},
108-
{
109-
type: "text",
110-
text: "Dynamic suffix",
111-
},
95+
{ type: "text", text: "Stable prefix", cache_control: { type: "ephemeral" } },
96+
{ type: "text", text: "Dynamic suffix" },
97+
],
98+
tools: [
99+
{ name: "exec", input_schema: { type: "object" }, cache_control: { type: "ephemeral" } },
112100
],
113101
messages: [
102+
{
103+
role: "user",
104+
content: [{ type: "text", text: "Hello", cache_control: { type: "ephemeral" } }],
105+
},
106+
{ role: "assistant", content: [{ type: "tool_use", id: "t1", name: "exec", input: {} }] },
114107
{
115108
role: "user",
116109
content: [
117110
{
118-
type: "text",
119-
text: messageText,
111+
type: "tool_result",
112+
tool_use_id: "t1",
113+
content: [],
120114
cache_control: { type: "ephemeral" },
121115
},
122116
],
@@ -125,6 +119,29 @@ function buildExpectedCacheBoundaryPayload(messageText: string) {
125119
};
126120
}
127121

122+
function countCacheControlMarkers(payload: unknown): number {
123+
let count = 0;
124+
const visit = (value: unknown) => {
125+
if (Array.isArray(value)) {
126+
value.forEach(visit);
127+
return;
128+
}
129+
if (!value || typeof value !== "object") {
130+
return;
131+
}
132+
const record = value as Record<string, unknown>;
133+
if (record.cache_control !== undefined) {
134+
count += 1;
135+
}
136+
visit(record.content);
137+
};
138+
const record = payload as Record<string, unknown>;
139+
visit(record.system);
140+
visit(record.tools);
141+
visit(record.messages);
142+
return count;
143+
}
144+
128145
describe("createAnthropicVertexStreamFn", () => {
129146
beforeAll(async () => {
130147
({ createAnthropicVertexStreamFn, createAnthropicVertexStreamFnForModel } =
@@ -343,63 +360,35 @@ describe("createAnthropicVertexStreamFn", () => {
343360
expect(transportOptions).not.toHaveProperty("temperature");
344361
});
345362

346-
it("applies Anthropic cache-boundary shaping before forwarding payload hooks", async () => {
363+
it("keeps already-budgeted cache_control markers intact when forwarding payload hooks", async () => {
347364
const { deps, streamAnthropicMock } = createStreamDeps();
348365
const onPayload = vi.fn(async (payload: unknown) => payload);
349-
const { model, onPayload: transportPayloadHook } = captureCacheBoundaryPayloadHook(
366+
const { model, onPayload: transportPayloadHook } = captureTransportPayloadHook(
350367
onPayload,
351368
deps,
352369
streamAnthropicMock,
353370
);
354-
const payload = {
355-
system: [
356-
{
357-
type: "text",
358-
text: CACHE_BOUNDARY_PROMPT,
359-
cache_control: { type: "ephemeral" },
360-
},
361-
],
362-
messages: [{ role: "user", content: "Hello" }],
363-
};
371+
const payload = buildBudgetedTransportPayload();
364372

365373
const nextPayload = await transportPayloadHook?.(payload, model);
366374

367-
const expectedPayload = buildExpectedCacheBoundaryPayload("Hello");
368-
expect(onPayload).toHaveBeenCalledWith(expectedPayload, model);
369-
expect(nextPayload).toEqual(expectedPayload);
375+
expect(onPayload).toHaveBeenCalledWith(payload, model);
376+
expect(countCacheControlMarkers(nextPayload)).toBe(4);
377+
expect((nextPayload as ReturnType<typeof buildBudgetedTransportPayload>).system[1]).toEqual({
378+
type: "text",
379+
text: "Dynamic suffix",
380+
});
370381
});
371382

372-
it("reapplies Anthropic cache-boundary shaping when payload hooks return a fresh payload", async () => {
383+
it("omits the transport payload hook when the caller provides none", () => {
373384
const { deps, streamAnthropicMock } = createStreamDeps();
374-
const onPayload = vi.fn(async () => ({
375-
system: [
376-
{
377-
type: "text",
378-
text: CACHE_BOUNDARY_PROMPT,
379-
},
380-
],
381-
messages: [{ role: "user", content: "Hello again" }],
382-
}));
383-
const { model, onPayload: transportPayloadHook } = captureCacheBoundaryPayloadHook(
384-
onPayload,
385+
const { onPayload: transportPayloadHook } = captureTransportPayloadHook(
386+
undefined,
385387
deps,
386388
streamAnthropicMock,
387389
);
388390

389-
const nextPayload = await transportPayloadHook?.(
390-
{
391-
system: [
392-
{
393-
type: "text",
394-
text: CACHE_BOUNDARY_PROMPT,
395-
},
396-
],
397-
messages: [{ role: "user", content: "Hello" }],
398-
},
399-
model,
400-
);
401-
402-
expect(nextPayload).toEqual(buildExpectedCacheBoundaryPayload("Hello again"));
391+
expect(transportPayloadHook).toBeUndefined();
403392
});
404393

405394
it("omits maxTokens when neither the model nor request provide a finite limit", () => {

extensions/anthropic-vertex/stream-runtime.ts

Lines changed: 5 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/**
22
* Anthropic Vertex stream runtime. It constructs Vertex SDK clients and adapts
3-
* OpenClaw stream options into Anthropic Messages payload policy.
3+
* OpenClaw stream options for the shared Anthropic Messages transport.
44
*/
55
import { AnthropicVertex as AnthropicVertexSdk } from "@anthropic-ai/vertex-sdk";
66
import type { StreamFn } from "openclaw/plugin-sdk/agent-core";
@@ -18,10 +18,6 @@ import {
1818
supportsClaudeNativeMaxEffort,
1919
supportsClaudeNativeXhighEffort,
2020
} from "openclaw/plugin-sdk/provider-model-shared";
21-
import {
22-
applyAnthropicPayloadPolicyToParams,
23-
resolveAnthropicPayloadPolicy,
24-
} from "openclaw/plugin-sdk/provider-stream-shared";
2521
import { resolveAnthropicVertexClientRegion, resolveAnthropicVertexProjectId } from "./region.js";
2622

2723
type AnthropicVertexTransportOptions = ProviderStreamOptions & {
@@ -120,36 +116,6 @@ function resolveAnthropicVertexMaxTokens(params: {
120116
return requested ?? modelMax;
121117
}
122118

123-
function createAnthropicVertexOnPayload(params: {
124-
model: { api: string; baseUrl?: string; provider: string };
125-
cacheRetention: ProviderStreamOptions["cacheRetention"] | undefined;
126-
onPayload: ProviderStreamOptions["onPayload"] | undefined;
127-
}): NonNullable<ProviderStreamOptions["onPayload"]> {
128-
const policy = resolveAnthropicPayloadPolicy({
129-
provider: params.model.provider,
130-
api: params.model.api,
131-
baseUrl: params.model.baseUrl,
132-
cacheRetention: params.cacheRetention,
133-
enableCacheControl: true,
134-
});
135-
136-
function applyPolicy(payload: unknown): unknown {
137-
if (payload && typeof payload === "object" && !Array.isArray(payload)) {
138-
applyAnthropicPayloadPolicyToParams(payload as Record<string, unknown>, policy);
139-
}
140-
return payload;
141-
}
142-
143-
return async (payload, model) => {
144-
const shapedPayload = applyPolicy(payload);
145-
const nextPayload = await params.onPayload?.(shapedPayload, model);
146-
if (nextPayload === undefined || nextPayload === shapedPayload) {
147-
return shapedPayload;
148-
}
149-
return applyPolicy(nextPayload);
150-
};
151-
}
152-
153119
/**
154120
* Create a StreamFn that routes through OpenClaw's generic model stream with an
155121
* injected `AnthropicVertex` client. All streaming, message conversion, and
@@ -200,11 +166,10 @@ export function createAnthropicVertexStreamFn(
200166
cacheRetention: options?.cacheRetention,
201167
sessionId: options?.sessionId,
202168
headers: options?.headers,
203-
onPayload: createAnthropicVertexOnPayload({
204-
model: transportModel,
205-
cacheRetention: options?.cacheRetention,
206-
onPayload: options?.onPayload,
207-
}),
169+
// The shared anthropic-messages transport already splits the system prompt
170+
// cache boundary and budgets all cache_control markers; re-applying the
171+
// payload policy here marked the uncached suffix and breached the 4-marker cap.
172+
onPayload: options?.onPayload,
208173
maxRetryDelayMs: options?.maxRetryDelayMs,
209174
metadata: options?.metadata,
210175
};

0 commit comments

Comments
 (0)