fix(responsecache): cache streaming responses#200
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThe PR extends the response cache system to support streaming requests. Previously, streaming requests bypassed all cache layers. Now they are cached as full JSON responses and replayed as Server-Sent Events (SSE) on hits. Cache key generation now includes Changes
Sequence DiagramsequenceDiagram
participant Client
participant CacheMiddleware as Cache Middleware
participant Handler
participant StreamReconstructor as Stream Reconstructor
alt Cache Miss
Client->>CacheMiddleware: Request (stream=true/false)
CacheMiddleware->>Handler: Invoke handler
Handler-->>CacheMiddleware: Response (JSON or SSE)
CacheMiddleware->>CacheMiddleware: Reconstruct to JSON<br/>(if streaming)
CacheMiddleware->>CacheMiddleware: Store normalized JSON<br/>in cache
end
alt Cache Hit - Streaming Request
Client->>CacheMiddleware: Request (stream=true)
CacheMiddleware->>CacheMiddleware: Retrieve cached JSON
CacheMiddleware->>StreamReconstructor: Reconstruct SSE
StreamReconstructor->>StreamReconstructor: Parse JSON events<br/>Aggregate deltas<br/>Build chunks
StreamReconstructor-->>CacheMiddleware: Synthesized SSE stream
CacheMiddleware-->>Client: Content-Type: text/event-stream<br/>X-Cache: HIT
end
alt Cache Hit - Non-Streaming Request
Client->>CacheMiddleware: Request (stream=false)
CacheMiddleware->>CacheMiddleware: Retrieve cached JSON
CacheMiddleware-->>Client: Content-Type: application/json<br/>X-Cache: HIT
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/server/translated_inference_service.go (1)
173-180:⚠️ Potential issue | 🟠 MajorHonor
Cache-Control: no-cacheon this translated cache path.This branch now sends streaming requests through
ResponseCacheMiddleware.HandleRequest, but that code only short-circuitsno-store. Exact-cache hits/stores on translated endpoints will still happen forCache-Control: no-cache, which diverges fromsimpleCacheMiddleware.Middleware()and its existing no-cache coverage. Please reuse the same bypass semantics here or insideHandleRequestso callers can reliably force a miss.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/server/translated_inference_service.go` around lines 173 - 180, The translated cache path currently always calls s.responseCache.HandleRequest (via responseCache, marshalRequestBody and dispatch) which doesn’t respect "Cache-Control: no-cache"; before invoking s.responseCache.HandleRequest, detect if the incoming request contains a Cache-Control header with "no-cache" (same semantics used in simpleCacheMiddleware.Middleware) and if so bypass the cache by calling dispatch(c, req, plan) directly (or return its streaming result) instead of HandleRequest; alternatively, add the same no-cache check inside ResponseCacheMiddleware.HandleRequest so HandleRequest and the translated path share identical bypass behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/responsecache/stream_cache.go`:
- Around line 60-68: The cache builder (responsesOutputState and its use in
OnJSONEvent) currently only captures text and function-call arguments, so
streamed events with response.reasoning_text.delta are not persisted into the
cached /v1/responses body; update responsesOutputState to track reasoning_text
(e.g., add a strings.Builder and a HasReasoning bool) and extend OnJSONEvent to
handle the "response.reasoning_text.delta" event branch by appending delta
chunks to that builder and setting the flag, and ensure the final cached Item
includes the persisted reasoning_text field when HasReasoning is true so
replaying cached hits can emit the same reasoning deltas.
- Around line 180-230: The cached chat-stream builder and renderer omit
choice.logprobs causing payload-shape changes on cache hits; update the cache
creation to copy choiceMap["logprobs"] into the canonical cached JSON and update
renderCachedChatStream to include choice.Logprobs in the synthetic chunk's delta
(same place as "content"/"reasoning_content") so that when building chunk :=
{... "choices": []map[string]any{{"index": choice.Index, "delta": delta,
"finish_reason": choice.FinishReason}}, ...} you also inject the logprobs (if
non-nil) into either delta or the choice object as in the original spec; locate
the code that builds the canonical cached JSON (where choiceMap is constructed)
and the function renderCachedChatStream (and helpers like chatReasoningContent
or renderChatToolCalls) and ensure both sides serialize/deserialize
choice.logprobs consistently.
---
Outside diff comments:
In `@internal/server/translated_inference_service.go`:
- Around line 173-180: The translated cache path currently always calls
s.responseCache.HandleRequest (via responseCache, marshalRequestBody and
dispatch) which doesn’t respect "Cache-Control: no-cache"; before invoking
s.responseCache.HandleRequest, detect if the incoming request contains a
Cache-Control header with "no-cache" (same semantics used in
simpleCacheMiddleware.Middleware) and if so bypass the cache by calling
dispatch(c, req, plan) directly (or return its streaming result) instead of
HandleRequest; alternatively, add the same no-cache check inside
ResponseCacheMiddleware.HandleRequest so HandleRequest and the translated path
share identical bypass behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: fb65a7ee-a5ad-4a9f-b5f4-5bc798a0ce05
📒 Files selected for processing (8)
internal/responsecache/handle_request_test.gointernal/responsecache/middleware_test.gointernal/responsecache/responsecache.gointernal/responsecache/semantic.gointernal/responsecache/semantic_test.gointernal/responsecache/simple.gointernal/responsecache/stream_cache.gointernal/server/translated_inference_service.go
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/responsecache/stream_cache.go`:
- Around line 701-725: When handling "response.output_text.delta" and
"response.reasoning_text.delta" events, preserve and honor locator fields
(item_id and output_index) instead of ignoring them: extract event["item_id"]
(string) and event["output_index"] (number) and use them to determine the target
output index (via b.lookupOutputIndex or by setting b.AssistantIndex/output
index explicitly) before calling b.output(index).AppendText or AppendReasoning;
when creating/replaying cached SSE events in responsesContentDeltaEvent(),
include the same item_id and output_index fields so cached deltas are attached
to the correct output item and match the live SSE contract. Ensure to update
code paths that call b.ensureReasoningOutputIndex to accept/expose an explicit
output_index when provided.
- Around line 935-950: The current block sets reasoning payloads into "content"
for type == "reasoning", which breaks canonical shape; change BuildItem logic so
that when s.HasReasoning is true and itemType is "reasoning" the reasoning data
is written under "summary" (not "content"). Concretely: adjust the targetField
selection around itemType (used in this diff) so that itemType == "reasoning"
forces targetField = "summary" (and keep the existing behavior of using
"summary" if item["summary"] is already an array for non-reasoning types);
ensure item["type"] is still set to "reasoning" when itemType was empty and
preserve writing the reasoning_text payload into item[targetField].
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 1f3b3eea-557d-4076-bbeb-21241d503a9e
📒 Files selected for processing (4)
internal/responsecache/handle_request_test.gointernal/responsecache/semantic.gointernal/responsecache/semantic_test.gointernal/responsecache/stream_cache.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/responsecache/handle_request_test.go`:
- Around line 482-560: The test
TestReconstructStreamingResponse_PreservesResponsesReasoningText only verifies
content_index==0 and can miss regressions where multiple reasoning parts from
the same item are merged; update the test (and/or add a new test) to include SSE
events for response.reasoning_text.delta with the same item_id ("rs_1") but
content_index 0 and 1, then assert reconstructStreamingResponse preserves both
summary parts separately (two summary entries with type "reasoning_text" and
texts matching each delta) and that renderCachedResponsesStream replay emits two
response.reasoning_text.delta events with correct delta, item_id, output_index
and content_index values (use the existing helpers reconstructStreamingResponse,
renderCachedResponsesStream, parseSSEJSONEvents, and jsonNumberToInt to locate
and validate these entries).
In `@internal/responsecache/stream_cache.go`:
- Around line 62-72: The current responsesOutputState uses single Text and
Reasoning builders per item and the delta handlers ignore content_index, causing
distinct output_text/reasoning_text parts to be merged and potentially overwrite
content/summary; update the data model and handlers to preserve parts keyed by
content_index (e.g., change responsesOutputState to map[int]strings.Builder or
store slices/maps for Text and Reasoning keyed by content_index), update the
delta handlers that process "output_text" and "reasoning_text" to append to the
proper builder for the given content_index instead of the top-level
Text/Reasoning, and when finalizing the cached item (e.g., when building Item
content or summary) concatenate parts in content_index order so each output part
remains separate and cannot overwrite other fields like summary or content.
- Around line 240-255: The code currently inlines usage into the last choice
chunk inside the loop (see resp, resp.Choices and appendSSEJSONEvent), causing
inconsistent wire shapes; instead, stop adding usage into the last choice chunk
and always emit a standalone final SSE JSON chunk containing id, object
"chat.completion.chunk", model, "choices": [] and the usage (i.e., after the
loop always call appendSSEJSONEvent to send that final usage-only chunk using
the same structure used for the zero-choices case), so replay shape is
consistent regardless of choice count.
- Around line 152-156: When a cached payload fails to decode/render in
writeCachedResponse (e.g., calls to isStreamingRequest -> renderCachedStream and
the non-stream decode paths referenced around lines 188-190 and 273-275), treat
that as a cache miss instead of returning the raw error: catch the decode/render
error, log a warning with context (path, cacheType) and return nil so the caller
will fall through to the provider; if you must return an error to the client
from these code paths, wrap it as core.GatewayError rather than returning the
raw error. Ensure this change is applied to renderCachedStream failure handling
in writeCachedResponse and the equivalent decode branches so cache
corruption/schema drift doesn’t 500 the client.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 17b730bb-db0d-4e0e-aa76-3fc6e7c26182
📒 Files selected for processing (2)
internal/responsecache/handle_request_test.gointernal/responsecache/stream_cache.go
| func writeCachedResponse(c *echo.Context, path string, requestBody, cached []byte, cacheType string) error { | ||
| if isStreamingRequest(path, requestBody) { | ||
| streamBody, err := renderCachedStream(path, requestBody, cached) | ||
| if err != nil { | ||
| return err |
There was a problem hiding this comment.
Treat cache replay decode failures as cache misses, not client errors.
A bad cached payload currently turns an exact hit into a user-visible failure, because these raw decode/render errors escape straight out of the request path. Cache corruption or schema drift should invalidate/skip the entry and fall through to the provider instead of 500ing every matching request until eviction; if anything is still returned to the client, wrap it as core.GatewayError.
As per coding guidelines, **/*.go: All errors returned to clients must be instances of core.GatewayError.
Also applies to: 188-190, 273-275
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/responsecache/stream_cache.go` around lines 152 - 156, When a cached
payload fails to decode/render in writeCachedResponse (e.g., calls to
isStreamingRequest -> renderCachedStream and the non-stream decode paths
referenced around lines 188-190 and 273-275), treat that as a cache miss instead
of returning the raw error: catch the decode/render error, log a warning with
context (path, cacheType) and return nil so the caller will fall through to the
provider; if you must return an error to the client from these code paths, wrap
it as core.GatewayError rather than returning the raw error. Ensure this change
is applied to renderCachedStream failure handling in writeCachedResponse and the
equivalent decode branches so cache corruption/schema drift doesn’t 500 the
client.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/responsecache/handle_request_test.go`:
- Around line 314-329: The test races the asynchronous cache store in
internal/responsecache/simple.go (stores are performed in background), so after
calling run("no-cache") you must wait for the background cache write to complete
before issuing rec3; modify the test (referring to run and handlerCalls) to
poll/wait for the async store to finish (for example by waiting until
handlerCalls increments to the expected value or by introducing a small
synchronization hook that the test can observe) before asserting rec3's
headers/body to avoid the race.
In `@internal/responsecache/stream_cache.go`:
- Around line 1133-1145: The synthetic response frame created in
responsesAddedItem leaves any final function-call "arguments" intact, causing
cache hits to re-expose full arguments in response.output_item.added and later
in response.function_call_arguments.delta/done; update responsesAddedItem to
clear that field on the cloned map (e.g., remove the "arguments" key or set it
to an empty value) after cloning and before returning so the synthetic added
frame contains no function-call arguments.
- Around line 330-335: The code always emits a "response.completed" SSE when
finishing caching, which loses original terminal events like "response.failed"
or "response.incomplete" and their payloads; update the logic that calls
appendSSEJSONEvent (and the value of the "type" field) to emit the actual
terminal event name (response.completed, response.failed, or
response.incomplete) and include the original terminal payload (errors/metadata)
instead of forcing completed, and also update the replay builder's switch (the
handler that currently only handles response.created, response.completed,
response.done) to add cases for response.failed and response.incomplete so those
events are consumed and rehydrated during replay.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 8126a9ad-6338-49a0-bac3-0b07b851e8b7
📒 Files selected for processing (3)
docs/about/who-we-are.mdxinternal/responsecache/handle_request_test.gointernal/responsecache/stream_cache.go
💤 Files with no reviewable changes (1)
- docs/about/who-we-are.mdx
| rec2 := run("no-cache") | ||
| if got := rec2.Header().Get("X-Cache"); got != "" { | ||
| t.Fatalf("no-cache request should bypass cache, got X-Cache=%q", got) | ||
| } | ||
| if !bytes.Contains(rec2.Body.Bytes(), []byte(`"n":2`)) { | ||
| t.Fatalf("no-cache response body = %q, want fresh handler response", rec2.Body.String()) | ||
| } | ||
|
|
||
| rec3 := run("") | ||
| if got := rec3.Header().Get("X-Cache"); got != "HIT (exact)" { | ||
| t.Fatalf("follow-up request should still hit original cache entry, got X-Cache=%q", got) | ||
| } | ||
| if !bytes.Contains(rec3.Body.Bytes(), []byte(`"n":1`)) { | ||
| t.Fatalf("cached response body = %q, want original cached payload", rec3.Body.String()) | ||
| } | ||
| if handlerCalls != 2 { |
There was a problem hiding this comment.
Wait for async cache stores before the follow-up assertion.
internal/responsecache/simple.go:128-160 stores responses asynchronously. Without waiting after rec2 := run("no-cache"), rec3 can race an unintended background write and still read the original n=1, so this regression can false-pass.
Proposed fix
rec2 := run("no-cache")
if got := rec2.Header().Get("X-Cache"); got != "" {
t.Fatalf("no-cache request should bypass cache, got X-Cache=%q", got)
}
if !bytes.Contains(rec2.Body.Bytes(), []byte(`"n":2`)) {
t.Fatalf("no-cache response body = %q, want fresh handler response", rec2.Body.String())
}
+
+ m.simple.wg.Wait()
+ m.semantic.wg.Wait()
rec3 := run("")
if got := rec3.Header().Get("X-Cache"); got != "HIT (exact)" {
t.Fatalf("follow-up request should still hit original cache entry, got X-Cache=%q", got)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| rec2 := run("no-cache") | |
| if got := rec2.Header().Get("X-Cache"); got != "" { | |
| t.Fatalf("no-cache request should bypass cache, got X-Cache=%q", got) | |
| } | |
| if !bytes.Contains(rec2.Body.Bytes(), []byte(`"n":2`)) { | |
| t.Fatalf("no-cache response body = %q, want fresh handler response", rec2.Body.String()) | |
| } | |
| rec3 := run("") | |
| if got := rec3.Header().Get("X-Cache"); got != "HIT (exact)" { | |
| t.Fatalf("follow-up request should still hit original cache entry, got X-Cache=%q", got) | |
| } | |
| if !bytes.Contains(rec3.Body.Bytes(), []byte(`"n":1`)) { | |
| t.Fatalf("cached response body = %q, want original cached payload", rec3.Body.String()) | |
| } | |
| if handlerCalls != 2 { | |
| rec2 := run("no-cache") | |
| if got := rec2.Header().Get("X-Cache"); got != "" { | |
| t.Fatalf("no-cache request should bypass cache, got X-Cache=%q", got) | |
| } | |
| if !bytes.Contains(rec2.Body.Bytes(), []byte(`"n":2`)) { | |
| t.Fatalf("no-cache response body = %q, want fresh handler response", rec2.Body.String()) | |
| } | |
| m.simple.wg.Wait() | |
| m.semantic.wg.Wait() | |
| rec3 := run("") | |
| if got := rec3.Header().Get("X-Cache"); got != "HIT (exact)" { | |
| t.Fatalf("follow-up request should still hit original cache entry, got X-Cache=%q", got) | |
| } | |
| if !bytes.Contains(rec3.Body.Bytes(), []byte(`"n":1`)) { | |
| t.Fatalf("cached response body = %q, want original cached payload", rec3.Body.String()) | |
| } | |
| if handlerCalls != 2 { |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/responsecache/handle_request_test.go` around lines 314 - 329, The
test races the asynchronous cache store in internal/responsecache/simple.go
(stores are performed in background), so after calling run("no-cache") you must
wait for the background cache write to complete before issuing rec3; modify the
test (referring to run and handlerCalls) to poll/wait for the async store to
finish (for example by waiting until handlerCalls increments to the expected
value or by introducing a small synchronization hook that the test can observe)
before asserting rec3's headers/body to avoid the race.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
internal/responsecache/handle_request_test.go (1)
314-329:⚠️ Potential issue | 🟡 MinorRace condition: async cache store not awaited before follow-up assertion.
After
run("no-cache")at line 314, no synchronization occurs beforerec3 := run("")at line 322. The background cache write fromrec2could race withrec3, causing the test to pass or fail nondeterministically.🔧 Proposed fix
if !bytes.Contains(rec2.Body.Bytes(), []byte(`"n":2`)) { t.Fatalf("no-cache response body = %q, want fresh handler response", rec2.Body.String()) } + + m.simple.wg.Wait() + m.semantic.wg.Wait() rec3 := run("")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/responsecache/handle_request_test.go` around lines 314 - 329, The test has a race where the async cache store started by rec2 (from run("no-cache")) may not finish before rec3 := run("") is invoked; modify the test to wait for the background cache write to complete after rec2 and before rec3. Concretely, add synchronization (e.g., a channel, a test-only flush/wait helper, or a returned done signal from run) to await the cache-store completion triggered by run("no-cache") (reference run, rec2, rec3 and handlerCalls) and only then call run("") and assert X-Cache and handlerCalls == 2.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@internal/responsecache/handle_request_test.go`:
- Around line 314-329: The test has a race where the async cache store started
by rec2 (from run("no-cache")) may not finish before rec3 := run("") is invoked;
modify the test to wait for the background cache write to complete after rec2
and before rec3. Concretely, add synchronization (e.g., a channel, a test-only
flush/wait helper, or a returned done signal from run) to await the cache-store
completion triggered by run("no-cache") (reference run, rec2, rec3 and
handlerCalls) and only then call run("") and assert X-Cache and handlerCalls ==
2.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: b42dca85-f37c-42ac-a6e7-1495d0c1ce95
📒 Files selected for processing (2)
internal/responsecache/handle_request_test.gointernal/responsecache/stream_cache.go
Summary
Testing
Summary by CodeRabbit
New Features
Bug Fixes
Cache-Control: no-cacheheader now properly bypasses cache layers (previously onlyno-storedid).Improvements
stream_options.include_usageparameter variations, ensuring correct cache segregation.