Skip to content

Commit 58b999c

Browse files
fix(responsecache): tighten SSE validation
1 parent 83b9b97 commit 58b999c

3 files changed

Lines changed: 18 additions & 38 deletions

File tree

internal/responsecache/sse_validation.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ func validateCacheableSSE(raw []byte) bool {
2626
raw = raw[idx+sepLen:]
2727

2828
payload, hasData := sseEventPayload(event)
29+
if sawDone {
30+
return false
31+
}
2932
if !hasData {
3033
continue
3134
}
@@ -36,12 +39,7 @@ func validateCacheableSSE(raw []byte) bool {
3639
sawDone = true
3740
continue
3841
}
39-
if sawDone {
40-
return false
41-
}
42-
43-
var decoded map[string]any
44-
if err := json.Unmarshal(payload, &decoded); err != nil {
42+
if !json.Valid(payload) {
4543
return false
4644
}
4745
sawJSONPayload = true

internal/responsecache/sse_validation_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ func TestValidateCacheableSSE(t *testing.T) {
1010
raw []byte
1111
want bool
1212
}{
13+
{
14+
name: "rejects empty input",
15+
raw: []byte(""),
16+
want: false,
17+
},
1318
{
1419
name: "valid chat stream",
1520
raw: []byte(
@@ -52,6 +57,15 @@ func TestValidateCacheableSSE(t *testing.T) {
5257
),
5358
want: false,
5459
},
60+
{
61+
name: "rejects keepalive after done",
62+
raw: []byte(
63+
"data: {\"type\":\"response.created\",\"response\":{\"id\":\"resp-1\"}}\n\n" +
64+
"data: [DONE]\n\n" +
65+
": keep-alive\n\n",
66+
),
67+
want: false,
68+
},
5569
{
5670
name: "rejects trailing partial event",
5771
raw: []byte(

internal/responsecache/stream_cache.go

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ package responsecache
22

33
import (
44
"bytes"
5-
"context"
65
"encoding/json"
7-
"errors"
86
"net/http"
97
"sort"
108
"strings"
@@ -126,25 +124,6 @@ func cacheKeyRequestBody(path string, body []byte) []byte {
126124
}
127125
}
128126

129-
func streamResponseDefaultsFromContext(ctx context.Context) streamResponseDefaults {
130-
var defaults streamResponseDefaults
131-
132-
plan := core.GetExecutionPlan(ctx)
133-
if plan == nil {
134-
return defaults
135-
}
136-
137-
defaults.Provider = strings.TrimSpace(plan.ProviderType)
138-
if plan.Resolution != nil {
139-
if defaults.Provider == "" {
140-
defaults.Provider = strings.TrimSpace(plan.Resolution.ResolvedSelector.Provider)
141-
}
142-
defaults.Model = strings.TrimSpace(plan.Resolution.ResolvedSelector.Model)
143-
}
144-
145-
return defaults
146-
}
147-
148127
func isEventStreamContentType(contentType string) bool {
149128
if contentType == "" {
150129
return false
@@ -172,17 +151,6 @@ func writeCachedResponse(c *echo.Context, path string, requestBody, cached []byt
172151
return nil
173152
}
174153

175-
func renderCachedStream(path string, requestBody, cached []byte) ([]byte, error) {
176-
switch path {
177-
case "/v1/chat/completions":
178-
return renderCachedChatStream(requestBody, cached)
179-
case "/v1/responses":
180-
return renderCachedResponsesStream(requestBody, cached)
181-
default:
182-
return nil, errors.New("cached streaming replay is not supported for this path")
183-
}
184-
}
185-
186154
func renderCachedChatStream(requestBody, cached []byte) ([]byte, error) {
187155
var resp core.ChatResponse
188156
if err := json.Unmarshal(cached, &resp); err != nil {

0 commit comments

Comments
 (0)