Skip to content

Commit 83b9b97

Browse files
fix(responsecache): split stream cache entries and validate SSE
1 parent d5728e2 commit 83b9b97

11 files changed

Lines changed: 611 additions & 83 deletions

internal/responsecache/handle_request_test.go

Lines changed: 168 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func TestHandleRequest_CacheControlNoCacheBypassesAllLayers(t *testing.T) {
331331
}
332332
}
333333

334-
func TestHandleRequest_StreamingMissPopulatesExactCacheAcrossModes(t *testing.T) {
334+
func TestHandleRequest_StreamingMissPopulatesExactStreamingCacheOnly(t *testing.T) {
335335
store := cache.NewMapStore()
336336
defer store.Close()
337337

@@ -341,6 +341,11 @@ func TestHandleRequest_StreamingMissPopulatesExactCacheAcrossModes(t *testing.T)
341341

342342
streamBody := []byte(`{"model":"gpt-4","stream":true,"messages":[{"role":"user","content":"cache-streaming-cross-mode"}]}`)
343343
jsonBody := []byte(`{"model":"gpt-4","messages":[{"role":"user","content":"cache-streaming-cross-mode"}]}`)
344+
rawStream := []byte(
345+
"data: {\"id\":\"chatcmpl-stream-cache\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4\",\"provider\":\"openai\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n" +
346+
"data: {\"id\":\"chatcmpl-stream-cache\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4\",\"provider\":\"openai\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\" world\"},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":11,\"completion_tokens\":2,\"total_tokens\":13}}\n\n" +
347+
"data: [DONE]\n\n",
348+
)
344349
e := echo.New()
345350
handlerCalls := 0
346351

@@ -360,12 +365,13 @@ func TestHandleRequest_StreamingMissPopulatesExactCacheAcrossModes(t *testing.T)
360365
c.SetRequest(req.WithContext(core.WithExecutionPlan(req.Context(), plan)))
361366
if err := m.HandleRequest(c, body, func() error {
362367
handlerCalls++
363-
c.Response().Header().Set("Content-Type", "text/event-stream")
364-
c.Response().WriteHeader(http.StatusOK)
365-
_, _ = c.Response().Write([]byte("data: {\"id\":\"chatcmpl-stream-cache\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4\",\"provider\":\"openai\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n"))
366-
_, _ = c.Response().Write([]byte("data: {\"id\":\"chatcmpl-stream-cache\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4\",\"provider\":\"openai\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\" world\"},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":11,\"completion_tokens\":2,\"total_tokens\":13}}\n\n"))
367-
_, _ = c.Response().Write([]byte("data: [DONE]\n\n"))
368-
return nil
368+
if isStreamingRequest(c.Request().URL.Path, body) {
369+
c.Response().Header().Set("Content-Type", "text/event-stream")
370+
c.Response().WriteHeader(http.StatusOK)
371+
_, _ = c.Response().Write(rawStream)
372+
return nil
373+
}
374+
return c.JSON(http.StatusOK, map[string]string{"mode": "json"})
369375
}); err != nil {
370376
t.Fatalf("HandleRequest: %v", err)
371377
}
@@ -382,36 +388,180 @@ func TestHandleRequest_StreamingMissPopulatesExactCacheAcrossModes(t *testing.T)
382388
if handlerCalls != 1 {
383389
t.Fatalf("expected 1 handler invocation after streaming miss, got %d", handlerCalls)
384390
}
391+
if !bytes.Equal(rec1.Body.Bytes(), rawStream) {
392+
t.Fatalf("streaming miss body = %q, want original SSE payload", rec1.Body.String())
393+
}
385394

386395
m.simple.wg.Wait()
387396

388397
rec2 := run(jsonBody)
389-
if got := rec2.Header().Get("X-Cache"); got != "HIT (exact)" {
390-
t.Fatalf("non-streaming follow-up should hit exact cache, got X-Cache=%q", got)
398+
if got := rec2.Header().Get("X-Cache"); got != "" {
399+
t.Fatalf("non-streaming follow-up should miss exact cache because stream mode is keyed separately, got X-Cache=%q", got)
391400
}
392401
if got := rec2.Header().Get("Content-Type"); got != "application/json" {
393-
t.Fatalf("non-streaming hit Content-Type = %q, want application/json", got)
402+
t.Fatalf("non-streaming miss Content-Type = %q, want application/json", got)
394403
}
395-
if !bytes.Contains(rec2.Body.Bytes(), []byte(`"content":"Hello world"`)) {
396-
t.Fatalf("non-streaming cache hit body = %q, want reconstructed JSON response", rec2.Body.String())
404+
if !bytes.Contains(rec2.Body.Bytes(), []byte(`"mode":"json"`)) {
405+
t.Fatalf("non-streaming miss body = %q, want JSON response", rec2.Body.String())
397406
}
398-
if handlerCalls != 1 {
399-
t.Fatalf("non-streaming exact hit should not call handler again, got %d calls", handlerCalls)
407+
if handlerCalls != 2 {
408+
t.Fatalf("non-streaming miss should call handler again, got %d calls", handlerCalls)
400409
}
401410

411+
m.simple.wg.Wait()
412+
402413
rec3 := run(streamBody)
403414
if got := rec3.Header().Get("X-Cache"); got != "HIT (exact)" {
404-
t.Fatalf("streaming follow-up should hit exact cache, got X-Cache=%q", got)
415+
t.Fatalf("streaming follow-up should hit its own exact cache entry, got X-Cache=%q", got)
405416
}
406417
if got := rec3.Header().Get("Content-Type"); got != "text/event-stream" {
407418
t.Fatalf("streaming hit Content-Type = %q, want text/event-stream", got)
408419
}
409-
if !bytes.Contains(rec3.Body.Bytes(), []byte("Hello world")) || !bytes.Contains(rec3.Body.Bytes(), []byte("[DONE]")) {
410-
t.Fatalf("streaming cache hit body = %q, want synthesized SSE with content and [DONE]", rec3.Body.String())
420+
if !bytes.Equal(rec3.Body.Bytes(), rawStream) {
421+
t.Fatalf("streaming cache hit body = %q, want verbatim SSE replay", rec3.Body.String())
411422
}
412-
if handlerCalls != 1 {
423+
if handlerCalls != 2 {
413424
t.Fatalf("streaming exact hit should not call handler again, got %d calls", handlerCalls)
414425
}
426+
427+
rec4 := run(jsonBody)
428+
if got := rec4.Header().Get("X-Cache"); got != "HIT (exact)" {
429+
t.Fatalf("non-streaming follow-up should hit its own exact cache entry, got X-Cache=%q", got)
430+
}
431+
if got := rec4.Header().Get("Content-Type"); got != "application/json" {
432+
t.Fatalf("non-streaming hit Content-Type = %q, want application/json", got)
433+
}
434+
if !bytes.Contains(rec4.Body.Bytes(), []byte(`"mode":"json"`)) {
435+
t.Fatalf("non-streaming cache hit body = %q, want cached JSON response", rec4.Body.String())
436+
}
437+
if handlerCalls != 2 {
438+
t.Fatalf("non-streaming exact hit should not call handler again, got %d calls", handlerCalls)
439+
}
440+
}
441+
442+
func TestHandleRequest_StreamingExactHitWritesSyntheticUsageEntry(t *testing.T) {
443+
store := cache.NewMapStore()
444+
defer store.Close()
445+
446+
logger := &recordingUsageLogger{}
447+
m := &ResponseCacheMiddleware{
448+
simple: newSimpleCacheMiddleware(store, time.Hour, newUsageHitRecorder(logger, nil)),
449+
}
450+
451+
body := []byte(`{"model":"gpt-4","stream":true,"messages":[{"role":"user","content":"cache-stream-usage-hit"}]}`)
452+
rawStream := []byte(
453+
"data: {\"id\":\"chatcmpl-cache-hit\",\"object\":\"chat.completion.chunk\",\"model\":\"gpt-4\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hello\"},\"finish_reason\":null}]}\n\n" +
454+
"data: {\"id\":\"chatcmpl-cache-hit\",\"object\":\"chat.completion.chunk\",\"model\":\"gpt-4\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":11,\"completion_tokens\":5,\"total_tokens\":16}}\n\n" +
455+
"data: [DONE]\n\n",
456+
)
457+
e := echo.New()
458+
459+
run := func() *httptest.ResponseRecorder {
460+
t.Helper()
461+
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body))
462+
req.Header.Set("Content-Type", "application/json")
463+
rec := httptest.NewRecorder()
464+
c := e.NewContext(req, rec)
465+
plan := &core.ExecutionPlan{
466+
Mode: core.ExecutionModeTranslated,
467+
ProviderType: "openai",
468+
Resolution: &core.RequestModelResolution{
469+
ResolvedSelector: core.ModelSelector{Provider: "openai", Model: "gpt-4"},
470+
},
471+
}
472+
c.SetRequest(req.WithContext(core.WithExecutionPlan(req.Context(), plan)))
473+
if err := m.HandleRequest(c, body, func() error {
474+
c.Response().Header().Set("Content-Type", "text/event-stream")
475+
c.Response().WriteHeader(http.StatusOK)
476+
_, _ = c.Response().Write(rawStream)
477+
return nil
478+
}); err != nil {
479+
t.Fatalf("HandleRequest: %v", err)
480+
}
481+
return rec
482+
}
483+
484+
rec1 := run()
485+
if got := rec1.Header().Get("X-Cache"); got != "" {
486+
t.Fatalf("first request should miss exact cache, got X-Cache=%q", got)
487+
}
488+
489+
m.simple.wg.Wait()
490+
491+
rec2 := run()
492+
if got := rec2.Header().Get("X-Cache"); got != "HIT (exact)" {
493+
t.Fatalf("second request should be exact hit, got X-Cache=%q", got)
494+
}
495+
if len(logger.entries) != 1 {
496+
t.Fatalf("expected 1 synthetic usage entry, got %d", len(logger.entries))
497+
}
498+
entry := logger.entries[0]
499+
if entry.CacheType != usage.CacheTypeExact {
500+
t.Fatalf("CacheType = %q, want %q", entry.CacheType, usage.CacheTypeExact)
501+
}
502+
if entry.InputTokens != 11 || entry.OutputTokens != 5 || entry.TotalTokens != 16 {
503+
t.Fatalf("unexpected tokens: %+v", entry)
504+
}
505+
if entry.ProviderID != "chatcmpl-cache-hit" {
506+
t.Fatalf("ProviderID = %q, want chatcmpl-cache-hit", entry.ProviderID)
507+
}
508+
}
509+
510+
func TestHandleRequest_InvalidStreamingBodySkipsExactCacheWrite(t *testing.T) {
511+
store := cache.NewMapStore()
512+
defer store.Close()
513+
514+
m := &ResponseCacheMiddleware{
515+
simple: newSimpleCacheMiddleware(store, time.Hour, nil),
516+
}
517+
518+
body := []byte(`{"model":"gpt-4","stream":true,"messages":[{"role":"user","content":"invalid-stream-cache"}]}`)
519+
invalidStream := []byte(
520+
"data: {\"id\":\"chatcmpl-invalid\",\"object\":\"chat.completion.chunk\",\"model\":\"gpt-4\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"partial\"},\"finish_reason\":null}]}\n\n",
521+
)
522+
e := echo.New()
523+
handlerCalls := 0
524+
525+
run := func() *httptest.ResponseRecorder {
526+
t.Helper()
527+
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(body))
528+
req.Header.Set("Content-Type", "application/json")
529+
rec := httptest.NewRecorder()
530+
c := e.NewContext(req, rec)
531+
plan := &core.ExecutionPlan{
532+
Mode: core.ExecutionModeTranslated,
533+
ProviderType: "openai",
534+
Resolution: &core.RequestModelResolution{
535+
ResolvedSelector: core.ModelSelector{Provider: "openai", Model: "gpt-4"},
536+
},
537+
}
538+
c.SetRequest(req.WithContext(core.WithExecutionPlan(req.Context(), plan)))
539+
if err := m.HandleRequest(c, body, func() error {
540+
handlerCalls++
541+
c.Response().Header().Set("Content-Type", "text/event-stream")
542+
c.Response().WriteHeader(http.StatusOK)
543+
_, _ = c.Response().Write(invalidStream)
544+
return nil
545+
}); err != nil {
546+
t.Fatalf("HandleRequest: %v", err)
547+
}
548+
return rec
549+
}
550+
551+
rec1 := run()
552+
if got := rec1.Header().Get("X-Cache"); got != "" {
553+
t.Fatalf("first request should miss cache, got X-Cache=%q", got)
554+
}
555+
556+
m.simple.wg.Wait()
557+
558+
rec2 := run()
559+
if got := rec2.Header().Get("X-Cache"); got != "" {
560+
t.Fatalf("invalid streaming body should not be cached, got X-Cache=%q", got)
561+
}
562+
if handlerCalls != 2 {
563+
t.Fatalf("expected invalid stream to bypass cache on follow-up, got %d calls", handlerCalls)
564+
}
415565
}
416566

417567
func TestReconstructStreamingResponse_PreservesChatReasoningContent(t *testing.T) {

internal/responsecache/middleware_test.go

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -219,38 +219,54 @@ func TestHashRequest_StreamIncludeUsageChangesKey(t *testing.T) {
219219
}
220220
}
221221

222-
func TestSimpleCacheMiddleware_SharesCacheAcrossStreamingAndNonStreaming(t *testing.T) {
222+
func TestHashRequest_StreamModeChangesKey(t *testing.T) {
223+
base := []byte(`{"model":"gpt-4","messages":[{"role":"user","content":"hi"}]}`)
224+
streaming := []byte(`{"model":"gpt-4","stream":true,"messages":[{"role":"user","content":"hi"}]}`)
225+
plan := &core.ExecutionPlan{
226+
Mode: core.ExecutionModeTranslated,
227+
ProviderType: "openai",
228+
Resolution: &core.RequestModelResolution{
229+
ResolvedSelector: core.ModelSelector{Provider: "openai", Model: "gpt-4"},
230+
},
231+
}
232+
233+
first := hashRequest("/v1/chat/completions", base, plan)
234+
second := hashRequest("/v1/chat/completions", streaming, plan)
235+
236+
if first == second {
237+
t.Fatal("stream mode should affect the exact cache key")
238+
}
239+
}
240+
241+
func TestSimpleCacheMiddleware_SeparatesStreamingAndNonStreamingEntries(t *testing.T) {
223242
store := cache.NewMapStore()
224243
defer store.Close()
225244
mw := NewResponseCacheMiddlewareWithStore(store, time.Hour)
226245
e := echo.New()
227246
installResolvedExecutionPlan(e, "openai", "gpt-4")
228247
e.Use(mw.Middleware())
229248
callCount := 0
249+
rawStream := []byte(
250+
"data: {\"id\":\"chatcmpl-stream\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"streamed\"},\"finish_reason\":null}]}\n\n" +
251+
"data: {\"id\":\"chatcmpl-stream\",\"object\":\"chat.completion.chunk\",\"created\":1234567890,\"model\":\"gpt-4\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":9,\"completion_tokens\":1,\"total_tokens\":10}}\n\n" +
252+
"data: [DONE]\n\n",
253+
)
230254
e.POST("/v1/chat/completions", func(c *echo.Context) error {
231255
callCount++
232-
return c.JSON(http.StatusOK, &core.ChatResponse{
233-
ID: "chatcmpl-shared-cache",
234-
Object: "chat.completion",
235-
Model: "gpt-4",
236-
Provider: "openai",
237-
Created: 1234567890,
238-
Choices: []core.Choice{
239-
{
240-
Index: 0,
241-
Message: core.ResponseMessage{
242-
Role: "assistant",
243-
Content: "shared cached response",
244-
},
245-
FinishReason: "stop",
246-
},
247-
},
248-
Usage: core.Usage{
249-
PromptTokens: 9,
250-
CompletionTokens: 3,
251-
TotalTokens: 12,
252-
},
253-
})
256+
body, cacheable, err := requestBodyForCache(c.Request())
257+
if err != nil {
258+
t.Fatalf("requestBodyForCache: %v", err)
259+
}
260+
if !cacheable {
261+
t.Fatal("expected request to be cacheable")
262+
}
263+
if isStreamingRequest(c.Request().URL.Path, body) {
264+
c.Response().Header().Set("Content-Type", "text/event-stream")
265+
c.Response().WriteHeader(http.StatusOK)
266+
_, _ = c.Response().Write(rawStream)
267+
return nil
268+
}
269+
return c.JSON(http.StatusOK, map[string]string{"result": "json cached response"})
254270
})
255271

256272
nonStreamingBody := []byte(`{"model":"gpt-4","messages":[{"role":"user","content":"hi"}]}`)
@@ -270,17 +286,53 @@ func TestSimpleCacheMiddleware_SharesCacheAcrossStreamingAndNonStreaming(t *test
270286
req2.Header.Set("Content-Type", "application/json")
271287
rec2 := httptest.NewRecorder()
272288
e.ServeHTTP(rec2, req2)
273-
if got := rec2.Header().Get("X-Cache"); got != "HIT (exact)" {
274-
t.Fatalf("streaming request should reuse cached full response, got X-Cache=%q", got)
289+
if got := rec2.Header().Get("X-Cache"); got != "" {
290+
t.Fatalf("streaming request should miss exact cache because stream mode is keyed separately, got X-Cache=%q", got)
275291
}
276292
if got := rec2.Header().Get("Content-Type"); got != "text/event-stream" {
293+
t.Fatalf("streaming miss Content-Type = %q, want text/event-stream", got)
294+
}
295+
if !bytes.Equal(rec2.Body.Bytes(), rawStream) {
296+
t.Fatalf("streaming miss body = %q, want original SSE payload", rec2.Body.String())
297+
}
298+
if callCount != 2 {
299+
t.Fatalf("expected separate stream miss to call handler again, got %d calls", callCount)
300+
}
301+
302+
mw.simple.wg.Wait()
303+
304+
req3 := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(streamingBody))
305+
req3.Header.Set("Content-Type", "application/json")
306+
rec3 := httptest.NewRecorder()
307+
e.ServeHTTP(rec3, req3)
308+
if got := rec3.Header().Get("X-Cache"); got != "HIT (exact)" {
309+
t.Fatalf("streaming follow-up should hit its own exact cache entry, got X-Cache=%q", got)
310+
}
311+
if got := rec3.Header().Get("Content-Type"); got != "text/event-stream" {
277312
t.Fatalf("streaming cache hit Content-Type = %q, want text/event-stream", got)
278313
}
279-
if !bytes.Contains(rec2.Body.Bytes(), []byte("shared cached response")) || !bytes.Contains(rec2.Body.Bytes(), []byte("[DONE]")) {
280-
t.Fatalf("streaming cache hit body = %q, want synthesized SSE", rec2.Body.String())
314+
if !bytes.Equal(rec3.Body.Bytes(), rawStream) {
315+
t.Fatalf("streaming cache hit body = %q, want verbatim SSE replay", rec3.Body.String())
281316
}
282-
if callCount != 1 {
283-
t.Fatalf("expected streaming replay to avoid second handler call, got %d calls", callCount)
317+
if callCount != 2 {
318+
t.Fatalf("expected streaming replay to avoid a third handler call, got %d calls", callCount)
319+
}
320+
321+
req4 := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", bytes.NewReader(nonStreamingBody))
322+
req4.Header.Set("Content-Type", "application/json")
323+
rec4 := httptest.NewRecorder()
324+
e.ServeHTTP(rec4, req4)
325+
if got := rec4.Header().Get("X-Cache"); got != "HIT (exact)" {
326+
t.Fatalf("non-streaming follow-up should hit its own exact cache entry, got X-Cache=%q", got)
327+
}
328+
if got := rec4.Header().Get("Content-Type"); got != "application/json" {
329+
t.Fatalf("non-streaming cache hit Content-Type = %q, want application/json", got)
330+
}
331+
if !bytes.Contains(rec4.Body.Bytes(), []byte("json cached response")) {
332+
t.Fatalf("non-streaming cache hit body = %q, want cached JSON response", rec4.Body.String())
333+
}
334+
if callCount != 2 {
335+
t.Fatalf("non-streaming exact hit should not call handler again, got %d calls", callCount)
284336
}
285337
}
286338

internal/responsecache/responsecache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ func (m *ResponseCacheMiddleware) Middleware() echo.MiddlewareFunc {
9494
// HandleRequest runs the full dual-layer cache check (exact then semantic) for a
9595
// translated inference request that has already been guardrail-patched.
9696
// body is the final patched request bytes; next is the real LLM call.
97-
// Streaming misses are reconstructed into full JSON before storage; streaming
98-
// hits replay that stored JSON as synthetic SSE.
97+
// Streaming and non-streaming requests are cached independently. Streaming
98+
// misses persist raw SSE bytes and replay them verbatim on cache hits.
9999
func (m *ResponseCacheMiddleware) HandleRequest(c *echo.Context, body []byte, next func() error) error {
100100
if m == nil {
101101
return next()

0 commit comments

Comments
 (0)