@@ -331,7 +331,7 @@ func TestHandleRequest_CacheControlNoCacheBypassesAllLayers(t *testing.T) {
331331 }
332332}
333333
334- func TestHandleRequest_StreamingMissPopulatesExactCacheAcrossModes (t * testing.T ) {
334+ func TestHandleRequest_StreamingMissPopulatesExactStreamingCacheOnly (t * testing.T ) {
335335 store := cache .NewMapStore ()
336336 defer store .Close ()
337337
@@ -341,6 +341,11 @@ func TestHandleRequest_StreamingMissPopulatesExactCacheAcrossModes(t *testing.T)
341341
342342 streamBody := []byte (`{"model":"gpt-4","stream":true,"messages":[{"role":"user","content":"cache-streaming-cross-mode"}]}` )
343343 jsonBody := []byte (`{"model":"gpt-4","messages":[{"role":"user","content":"cache-streaming-cross-mode"}]}` )
344+ rawStream := []byte (
345+ "data: {\" id\" :\" chatcmpl-stream-cache\" ,\" object\" :\" chat.completion.chunk\" ,\" created\" :1234567890,\" model\" :\" gpt-4\" ,\" provider\" :\" openai\" ,\" choices\" :[{\" index\" :0,\" delta\" :{\" role\" :\" assistant\" ,\" content\" :\" Hello\" },\" finish_reason\" :null}]}\n \n " +
346+ "data: {\" id\" :\" chatcmpl-stream-cache\" ,\" object\" :\" chat.completion.chunk\" ,\" created\" :1234567890,\" model\" :\" gpt-4\" ,\" provider\" :\" openai\" ,\" choices\" :[{\" index\" :0,\" delta\" :{\" content\" :\" world\" },\" finish_reason\" :\" stop\" }],\" usage\" :{\" prompt_tokens\" :11,\" completion_tokens\" :2,\" total_tokens\" :13}}\n \n " +
347+ "data: [DONE]\n \n " ,
348+ )
344349 e := echo .New ()
345350 handlerCalls := 0
346351
@@ -360,12 +365,13 @@ func TestHandleRequest_StreamingMissPopulatesExactCacheAcrossModes(t *testing.T)
360365 c .SetRequest (req .WithContext (core .WithExecutionPlan (req .Context (), plan )))
361366 if err := m .HandleRequest (c , body , func () error {
362367 handlerCalls ++
363- c .Response ().Header ().Set ("Content-Type" , "text/event-stream" )
364- c .Response ().WriteHeader (http .StatusOK )
365- _ , _ = c .Response ().Write ([]byte ("data: {\" id\" :\" chatcmpl-stream-cache\" ,\" object\" :\" chat.completion.chunk\" ,\" created\" :1234567890,\" model\" :\" gpt-4\" ,\" provider\" :\" openai\" ,\" choices\" :[{\" index\" :0,\" delta\" :{\" role\" :\" assistant\" ,\" content\" :\" Hello\" },\" finish_reason\" :null}]}\n \n " ))
366- _ , _ = c .Response ().Write ([]byte ("data: {\" id\" :\" chatcmpl-stream-cache\" ,\" object\" :\" chat.completion.chunk\" ,\" created\" :1234567890,\" model\" :\" gpt-4\" ,\" provider\" :\" openai\" ,\" choices\" :[{\" index\" :0,\" delta\" :{\" content\" :\" world\" },\" finish_reason\" :\" stop\" }],\" usage\" :{\" prompt_tokens\" :11,\" completion_tokens\" :2,\" total_tokens\" :13}}\n \n " ))
367- _ , _ = c .Response ().Write ([]byte ("data: [DONE]\n \n " ))
368- return nil
368+ if isStreamingRequest (c .Request ().URL .Path , body ) {
369+ c .Response ().Header ().Set ("Content-Type" , "text/event-stream" )
370+ c .Response ().WriteHeader (http .StatusOK )
371+ _ , _ = c .Response ().Write (rawStream )
372+ return nil
373+ }
374+ return c .JSON (http .StatusOK , map [string ]string {"mode" : "json" })
369375 }); err != nil {
370376 t .Fatalf ("HandleRequest: %v" , err )
371377 }
@@ -382,36 +388,180 @@ func TestHandleRequest_StreamingMissPopulatesExactCacheAcrossModes(t *testing.T)
382388 if handlerCalls != 1 {
383389 t .Fatalf ("expected 1 handler invocation after streaming miss, got %d" , handlerCalls )
384390 }
391+ if ! bytes .Equal (rec1 .Body .Bytes (), rawStream ) {
392+ t .Fatalf ("streaming miss body = %q, want original SSE payload" , rec1 .Body .String ())
393+ }
385394
386395 m .simple .wg .Wait ()
387396
388397 rec2 := run (jsonBody )
389- if got := rec2 .Header ().Get ("X-Cache" ); got != "HIT (exact) " {
390- t .Fatalf ("non-streaming follow-up should hit exact cache, got X-Cache=%q" , got )
398+ if got := rec2 .Header ().Get ("X-Cache" ); got != "" {
399+ t .Fatalf ("non-streaming follow-up should miss exact cache because stream mode is keyed separately , got X-Cache=%q" , got )
391400 }
392401 if got := rec2 .Header ().Get ("Content-Type" ); got != "application/json" {
393- t .Fatalf ("non-streaming hit Content-Type = %q, want application/json" , got )
402+ t .Fatalf ("non-streaming miss Content-Type = %q, want application/json" , got )
394403 }
395- if ! bytes .Contains (rec2 .Body .Bytes (), []byte (`"content ":"Hello world "` )) {
396- t .Fatalf ("non-streaming cache hit body = %q, want reconstructed JSON response" , rec2 .Body .String ())
404+ if ! bytes .Contains (rec2 .Body .Bytes (), []byte (`"mode ":"json "` )) {
405+ t .Fatalf ("non-streaming miss body = %q, want JSON response" , rec2 .Body .String ())
397406 }
398- if handlerCalls != 1 {
399- t .Fatalf ("non-streaming exact hit should not call handler again, got %d calls" , handlerCalls )
407+ if handlerCalls != 2 {
408+ t .Fatalf ("non-streaming miss should call handler again, got %d calls" , handlerCalls )
400409 }
401410
411+ m .simple .wg .Wait ()
412+
402413 rec3 := run (streamBody )
403414 if got := rec3 .Header ().Get ("X-Cache" ); got != "HIT (exact)" {
404- t .Fatalf ("streaming follow-up should hit exact cache, got X-Cache=%q" , got )
415+ t .Fatalf ("streaming follow-up should hit its own exact cache entry , got X-Cache=%q" , got )
405416 }
406417 if got := rec3 .Header ().Get ("Content-Type" ); got != "text/event-stream" {
407418 t .Fatalf ("streaming hit Content-Type = %q, want text/event-stream" , got )
408419 }
409- if ! bytes .Contains (rec3 .Body .Bytes (), [] byte ( "Hello world" )) || ! bytes . Contains ( rec3 . Body . Bytes (), [] byte ( "[DONE]" ) ) {
410- t .Fatalf ("streaming cache hit body = %q, want synthesized SSE with content and [DONE] " , rec3 .Body .String ())
420+ if ! bytes .Equal (rec3 .Body .Bytes (), rawStream ) {
421+ t .Fatalf ("streaming cache hit body = %q, want verbatim SSE replay " , rec3 .Body .String ())
411422 }
412- if handlerCalls != 1 {
423+ if handlerCalls != 2 {
413424 t .Fatalf ("streaming exact hit should not call handler again, got %d calls" , handlerCalls )
414425 }
426+
427+ rec4 := run (jsonBody )
428+ if got := rec4 .Header ().Get ("X-Cache" ); got != "HIT (exact)" {
429+ t .Fatalf ("non-streaming follow-up should hit its own exact cache entry, got X-Cache=%q" , got )
430+ }
431+ if got := rec4 .Header ().Get ("Content-Type" ); got != "application/json" {
432+ t .Fatalf ("non-streaming hit Content-Type = %q, want application/json" , got )
433+ }
434+ if ! bytes .Contains (rec4 .Body .Bytes (), []byte (`"mode":"json"` )) {
435+ t .Fatalf ("non-streaming cache hit body = %q, want cached JSON response" , rec4 .Body .String ())
436+ }
437+ if handlerCalls != 2 {
438+ t .Fatalf ("non-streaming exact hit should not call handler again, got %d calls" , handlerCalls )
439+ }
440+ }
441+
442+ func TestHandleRequest_StreamingExactHitWritesSyntheticUsageEntry (t * testing.T ) {
443+ store := cache .NewMapStore ()
444+ defer store .Close ()
445+
446+ logger := & recordingUsageLogger {}
447+ m := & ResponseCacheMiddleware {
448+ simple : newSimpleCacheMiddleware (store , time .Hour , newUsageHitRecorder (logger , nil )),
449+ }
450+
451+ body := []byte (`{"model":"gpt-4","stream":true,"messages":[{"role":"user","content":"cache-stream-usage-hit"}]}` )
452+ rawStream := []byte (
453+ "data: {\" id\" :\" chatcmpl-cache-hit\" ,\" object\" :\" chat.completion.chunk\" ,\" model\" :\" gpt-4\" ,\" choices\" :[{\" index\" :0,\" delta\" :{\" content\" :\" Hello\" },\" finish_reason\" :null}]}\n \n " +
454+ "data: {\" id\" :\" chatcmpl-cache-hit\" ,\" object\" :\" chat.completion.chunk\" ,\" model\" :\" gpt-4\" ,\" choices\" :[{\" index\" :0,\" delta\" :{},\" finish_reason\" :\" stop\" }],\" usage\" :{\" prompt_tokens\" :11,\" completion_tokens\" :5,\" total_tokens\" :16}}\n \n " +
455+ "data: [DONE]\n \n " ,
456+ )
457+ e := echo .New ()
458+
459+ run := func () * httptest.ResponseRecorder {
460+ t .Helper ()
461+ req := httptest .NewRequest (http .MethodPost , "/v1/chat/completions" , bytes .NewReader (body ))
462+ req .Header .Set ("Content-Type" , "application/json" )
463+ rec := httptest .NewRecorder ()
464+ c := e .NewContext (req , rec )
465+ plan := & core.ExecutionPlan {
466+ Mode : core .ExecutionModeTranslated ,
467+ ProviderType : "openai" ,
468+ Resolution : & core.RequestModelResolution {
469+ ResolvedSelector : core.ModelSelector {Provider : "openai" , Model : "gpt-4" },
470+ },
471+ }
472+ c .SetRequest (req .WithContext (core .WithExecutionPlan (req .Context (), plan )))
473+ if err := m .HandleRequest (c , body , func () error {
474+ c .Response ().Header ().Set ("Content-Type" , "text/event-stream" )
475+ c .Response ().WriteHeader (http .StatusOK )
476+ _ , _ = c .Response ().Write (rawStream )
477+ return nil
478+ }); err != nil {
479+ t .Fatalf ("HandleRequest: %v" , err )
480+ }
481+ return rec
482+ }
483+
484+ rec1 := run ()
485+ if got := rec1 .Header ().Get ("X-Cache" ); got != "" {
486+ t .Fatalf ("first request should miss exact cache, got X-Cache=%q" , got )
487+ }
488+
489+ m .simple .wg .Wait ()
490+
491+ rec2 := run ()
492+ if got := rec2 .Header ().Get ("X-Cache" ); got != "HIT (exact)" {
493+ t .Fatalf ("second request should be exact hit, got X-Cache=%q" , got )
494+ }
495+ if len (logger .entries ) != 1 {
496+ t .Fatalf ("expected 1 synthetic usage entry, got %d" , len (logger .entries ))
497+ }
498+ entry := logger .entries [0 ]
499+ if entry .CacheType != usage .CacheTypeExact {
500+ t .Fatalf ("CacheType = %q, want %q" , entry .CacheType , usage .CacheTypeExact )
501+ }
502+ if entry .InputTokens != 11 || entry .OutputTokens != 5 || entry .TotalTokens != 16 {
503+ t .Fatalf ("unexpected tokens: %+v" , entry )
504+ }
505+ if entry .ProviderID != "chatcmpl-cache-hit" {
506+ t .Fatalf ("ProviderID = %q, want chatcmpl-cache-hit" , entry .ProviderID )
507+ }
508+ }
509+
510+ func TestHandleRequest_InvalidStreamingBodySkipsExactCacheWrite (t * testing.T ) {
511+ store := cache .NewMapStore ()
512+ defer store .Close ()
513+
514+ m := & ResponseCacheMiddleware {
515+ simple : newSimpleCacheMiddleware (store , time .Hour , nil ),
516+ }
517+
518+ body := []byte (`{"model":"gpt-4","stream":true,"messages":[{"role":"user","content":"invalid-stream-cache"}]}` )
519+ invalidStream := []byte (
520+ "data: {\" id\" :\" chatcmpl-invalid\" ,\" object\" :\" chat.completion.chunk\" ,\" model\" :\" gpt-4\" ,\" choices\" :[{\" index\" :0,\" delta\" :{\" content\" :\" partial\" },\" finish_reason\" :null}]}\n \n " ,
521+ )
522+ e := echo .New ()
523+ handlerCalls := 0
524+
525+ run := func () * httptest.ResponseRecorder {
526+ t .Helper ()
527+ req := httptest .NewRequest (http .MethodPost , "/v1/chat/completions" , bytes .NewReader (body ))
528+ req .Header .Set ("Content-Type" , "application/json" )
529+ rec := httptest .NewRecorder ()
530+ c := e .NewContext (req , rec )
531+ plan := & core.ExecutionPlan {
532+ Mode : core .ExecutionModeTranslated ,
533+ ProviderType : "openai" ,
534+ Resolution : & core.RequestModelResolution {
535+ ResolvedSelector : core.ModelSelector {Provider : "openai" , Model : "gpt-4" },
536+ },
537+ }
538+ c .SetRequest (req .WithContext (core .WithExecutionPlan (req .Context (), plan )))
539+ if err := m .HandleRequest (c , body , func () error {
540+ handlerCalls ++
541+ c .Response ().Header ().Set ("Content-Type" , "text/event-stream" )
542+ c .Response ().WriteHeader (http .StatusOK )
543+ _ , _ = c .Response ().Write (invalidStream )
544+ return nil
545+ }); err != nil {
546+ t .Fatalf ("HandleRequest: %v" , err )
547+ }
548+ return rec
549+ }
550+
551+ rec1 := run ()
552+ if got := rec1 .Header ().Get ("X-Cache" ); got != "" {
553+ t .Fatalf ("first request should miss cache, got X-Cache=%q" , got )
554+ }
555+
556+ m .simple .wg .Wait ()
557+
558+ rec2 := run ()
559+ if got := rec2 .Header ().Get ("X-Cache" ); got != "" {
560+ t .Fatalf ("invalid streaming body should not be cached, got X-Cache=%q" , got )
561+ }
562+ if handlerCalls != 2 {
563+ t .Fatalf ("expected invalid stream to bypass cache on follow-up, got %d calls" , handlerCalls )
564+ }
415565}
416566
417567func TestReconstructStreamingResponse_PreservesChatReasoningContent (t * testing.T ) {
0 commit comments