@@ -3,8 +3,6 @@ import { createAssistantMessageEventStream, type Model } from "openclaw/plugin-s
33import { beforeAll , describe , expect , it , vi } from "vitest" ;
44import type { AnthropicVertexStreamDeps } from "./stream-runtime.js" ;
55
6- const SYSTEM_PROMPT_CACHE_BOUNDARY = "\n<!-- OPENCLAW_CACHE_BOUNDARY -->\n" ;
7-
86function createStreamDeps ( ) : {
97 deps : AnthropicVertexStreamDeps ;
108 streamAnthropicMock : ReturnType < typeof vi . fn > ;
@@ -50,8 +48,6 @@ function makeModel(params: {
5048 } as Model < "anthropic-messages" > ;
5149}
5250
53- const CACHE_BOUNDARY_PROMPT = `Stable prefix${ SYSTEM_PROMPT_CACHE_BOUNDARY } Dynamic suffix` ;
54-
5551type PayloadHook = ( payload : unknown , payloadModel : unknown ) => Promise < unknown > ;
5652
5753function streamAnthropicCall ( streamAnthropicMock : ReturnType < typeof vi . fn > ) : unknown [ ] {
@@ -72,8 +68,8 @@ function streamTransportOptions(
7268 return options as Record < string , unknown > ;
7369}
7470
75- function captureCacheBoundaryPayloadHook (
76- onPayload : PayloadHook ,
71+ function captureTransportPayloadHook (
72+ onPayload : PayloadHook | undefined ,
7773 deps : AnthropicVertexStreamDeps ,
7874 streamAnthropicMock : ReturnType < typeof vi . fn > ,
7975) {
@@ -82,41 +78,39 @@ function captureCacheBoundaryPayloadHook(
8278
8379 void streamFn (
8480 model ,
85- {
86- systemPrompt : CACHE_BOUNDARY_PROMPT ,
87- messages : [ { role : "user" , content : "Hello" } ] ,
88- } as never ,
89- {
90- cacheRetention : "short" ,
91- onPayload,
92- } as never ,
81+ { messages : [ { role : "user" , content : "Hello" } ] } as never ,
82+ { cacheRetention : "short" , ...( onPayload ? { onPayload } : { } ) } as never ,
9383 ) ;
9484
9585 const transportOptions = streamTransportOptions ( streamAnthropicMock ) ;
9686
9787 return { model, onPayload : transportOptions . onPayload as PayloadHook | undefined } ;
9888}
9989
100- function buildExpectedCacheBoundaryPayload ( messageText : string ) {
90+ // Mirrors the shared anthropic-messages transport output: cache boundary already
91+ // split (uncached dynamic suffix) and all four cache_control markers allocated.
92+ function buildBudgetedTransportPayload ( ) {
10193 return {
10294 system : [
103- {
104- type : "text" ,
105- text : "Stable prefix" ,
106- cache_control : { type : "ephemeral" } ,
107- } ,
108- {
109- type : "text" ,
110- text : "Dynamic suffix" ,
111- } ,
95+ { type : "text" , text : "Stable prefix" , cache_control : { type : "ephemeral" } } ,
96+ { type : "text" , text : "Dynamic suffix" } ,
97+ ] ,
98+ tools : [
99+ { name : "exec" , input_schema : { type : "object" } , cache_control : { type : "ephemeral" } } ,
112100 ] ,
113101 messages : [
102+ {
103+ role : "user" ,
104+ content : [ { type : "text" , text : "Hello" , cache_control : { type : "ephemeral" } } ] ,
105+ } ,
106+ { role : "assistant" , content : [ { type : "tool_use" , id : "t1" , name : "exec" , input : { } } ] } ,
114107 {
115108 role : "user" ,
116109 content : [
117110 {
118- type : "text" ,
119- text : messageText ,
111+ type : "tool_result" ,
112+ tool_use_id : "t1" ,
113+ content : [ ] ,
120114 cache_control : { type : "ephemeral" } ,
121115 } ,
122116 ] ,
@@ -125,6 +119,29 @@ function buildExpectedCacheBoundaryPayload(messageText: string) {
125119 } ;
126120}
127121
122+ function countCacheControlMarkers ( payload : unknown ) : number {
123+ let count = 0 ;
124+ const visit = ( value : unknown ) => {
125+ if ( Array . isArray ( value ) ) {
126+ value . forEach ( visit ) ;
127+ return ;
128+ }
129+ if ( ! value || typeof value !== "object" ) {
130+ return ;
131+ }
132+ const record = value as Record < string , unknown > ;
133+ if ( record . cache_control !== undefined ) {
134+ count += 1 ;
135+ }
136+ visit ( record . content ) ;
137+ } ;
138+ const record = payload as Record < string , unknown > ;
139+ visit ( record . system ) ;
140+ visit ( record . tools ) ;
141+ visit ( record . messages ) ;
142+ return count ;
143+ }
144+
128145describe ( "createAnthropicVertexStreamFn" , ( ) => {
129146 beforeAll ( async ( ) => {
130147 ( { createAnthropicVertexStreamFn, createAnthropicVertexStreamFnForModel } =
@@ -343,63 +360,35 @@ describe("createAnthropicVertexStreamFn", () => {
343360 expect ( transportOptions ) . not . toHaveProperty ( "temperature" ) ;
344361 } ) ;
345362
346- it ( "applies Anthropic cache-boundary shaping before forwarding payload hooks" , async ( ) => {
363+ it ( "keeps already-budgeted cache_control markers intact when forwarding payload hooks" , async ( ) => {
347364 const { deps, streamAnthropicMock } = createStreamDeps ( ) ;
348365 const onPayload = vi . fn ( async ( payload : unknown ) => payload ) ;
349- const { model, onPayload : transportPayloadHook } = captureCacheBoundaryPayloadHook (
366+ const { model, onPayload : transportPayloadHook } = captureTransportPayloadHook (
350367 onPayload ,
351368 deps ,
352369 streamAnthropicMock ,
353370 ) ;
354- const payload = {
355- system : [
356- {
357- type : "text" ,
358- text : CACHE_BOUNDARY_PROMPT ,
359- cache_control : { type : "ephemeral" } ,
360- } ,
361- ] ,
362- messages : [ { role : "user" , content : "Hello" } ] ,
363- } ;
371+ const payload = buildBudgetedTransportPayload ( ) ;
364372
365373 const nextPayload = await transportPayloadHook ?.( payload , model ) ;
366374
367- const expectedPayload = buildExpectedCacheBoundaryPayload ( "Hello" ) ;
368- expect ( onPayload ) . toHaveBeenCalledWith ( expectedPayload , model ) ;
369- expect ( nextPayload ) . toEqual ( expectedPayload ) ;
375+ expect ( onPayload ) . toHaveBeenCalledWith ( payload , model ) ;
376+ expect ( countCacheControlMarkers ( nextPayload ) ) . toBe ( 4 ) ;
377+ expect ( ( nextPayload as ReturnType < typeof buildBudgetedTransportPayload > ) . system [ 1 ] ) . toEqual ( {
378+ type : "text" ,
379+ text : "Dynamic suffix" ,
380+ } ) ;
370381 } ) ;
371382
372- it ( "reapplies Anthropic cache-boundary shaping when payload hooks return a fresh payload" , async ( ) => {
383+ it ( "omits the transport payload hook when the caller provides none" , ( ) => {
373384 const { deps, streamAnthropicMock } = createStreamDeps ( ) ;
374- const onPayload = vi . fn ( async ( ) => ( {
375- system : [
376- {
377- type : "text" ,
378- text : CACHE_BOUNDARY_PROMPT ,
379- } ,
380- ] ,
381- messages : [ { role : "user" , content : "Hello again" } ] ,
382- } ) ) ;
383- const { model, onPayload : transportPayloadHook } = captureCacheBoundaryPayloadHook (
384- onPayload ,
385+ const { onPayload : transportPayloadHook } = captureTransportPayloadHook (
386+ undefined ,
385387 deps ,
386388 streamAnthropicMock ,
387389 ) ;
388390
389- const nextPayload = await transportPayloadHook ?.(
390- {
391- system : [
392- {
393- type : "text" ,
394- text : CACHE_BOUNDARY_PROMPT ,
395- } ,
396- ] ,
397- messages : [ { role : "user" , content : "Hello" } ] ,
398- } ,
399- model ,
400- ) ;
401-
402- expect ( nextPayload ) . toEqual ( buildExpectedCacheBoundaryPayload ( "Hello again" ) ) ;
391+ expect ( transportPayloadHook ) . toBeUndefined ( ) ;
403392 } ) ;
404393
405394 it ( "omits maxTokens when neither the model nor request provide a finite limit" , ( ) => {
0 commit comments