Fix: SSE flushing and responses done markers#117
Conversation
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds a streaming-normalization wrapper EnsureResponsesDone that guarantees a trailing Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Handler as Server Handler
participant Provider
participant Wrapper as EnsureResponsesDone (io.ReadCloser)
participant Upstream as Upstream Stream
Client->>Handler: Request streaming response
Handler->>Provider: StreamChatCompletion(ctx, req)
Provider->>Upstream: open upstream SSE stream
Provider->>Wrapper: EnsureResponsesDone(upstream)
Wrapper-->>Provider: wrapped stream
Provider-->>Handler: wrapped stream
Handler->>Handler: flushStream(responseWriter, wrappedStream)
loop per-chunk
Handler->>Wrapper: Read(buf)
Wrapper->>Upstream: Read(buf)
Upstream-->>Wrapper: chunk
Wrapper->>Wrapper: detect/queue DONE marker if needed
Wrapper-->>Handler: chunk (may include appended `data: [DONE]`)
Handler->>Client: Write chunk + Flush
end
Handler->>Wrapper: Close()
Wrapper->>Upstream: Close()
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/providers/openai/openai.go`:
- Around line 190-201: The exported doc comment for StreamResponses is outdated
because the function no longer returns the provider's raw body verbatim; update
the comment on the StreamResponses function to explicitly state that the
returned io.ReadCloser is a normalized stream (produced by
providers.EnsureResponsesDone) which may synthesize a terminal "data: [DONE]"
marker and should not be assumed to contain raw upstream bytes, and mention that
callers must still close the returned ReadCloser.
In `@internal/providers/responses_done_wrapper.go`:
- Around line 50-70: The current EOF handling in responsesDoneWrapper appends
the full responsesDoneMarker when a partial prefix exists, causing duplicated
fragments; implement a donePrefixOverlap() helper (as suggested) that checks
overlap between w.tail and the start of responsesDoneMarker, then on EOF when
n>0 append only the missing suffix responsesDoneMarker[overlap:] to w.pending
(instead of the whole marker), and when n==0 copy only the missing suffix into p
or set w.pending to the remaining suffix; update logic around w.pending, w.tail,
w.sawDone and w.emitted to use the overlap value so we complete only the missing
bytes and preserve correct SSE framing.
In `@internal/providers/xai/xai.go`:
- Around line 130-141: Update the exported doc comment for StreamResponses to
reflect the new stream semantics: state that StreamResponses does not return the
provider's raw response bytes but a normalized stream (via
providers.EnsureResponsesDone) which may synthesize a terminal "data: [DONE]"
marker and that callers must not assume verbatim upstream bytes and remain
responsible for closing the returned io.ReadCloser; reference the
StreamResponses function and providers.EnsureResponsesDone in the comment.
In `@internal/server/handlers.go`:
- Around line 109-135: flushStream currently swallows all stream/read/write
errors; change its signature to return an error and propagate read/write
failures (return non-nil for non-EOF read errors and for any write errors) while
treating io.EOF as nil so callers can tell normal completion from abnormal
termination; update the caller handleStreamingResponse (the place that invokes
flushStream with wrappedStream and c.Response().Writer) to check the returned
error and record/log/audit abnormal terminations so the missing terminal marker
is detected and surfaced.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 30395d03-1451-4b7d-9ade-1a43482d1d78
📒 Files selected for processing (9)
GETTING_STARTED.mdinternal/providers/openai/openai.gointernal/providers/openai/openai_test.gointernal/providers/responses_done_wrapper.gointernal/providers/responses_done_wrapper_test.gointernal/providers/xai/xai.gointernal/providers/xai/xai_test.gointernal/server/handlers.gointernal/server/handlers_test.go
💤 Files with no reviewable changes (1)
- internal/providers/xai/xai_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
internal/providers/responses_done_wrapper.go (1)
59-82:⚠️ Potential issue | 🟠 MajorComplete only the missing
[DONE]suffix after a partial marker.Line 75 / Line 79 still emits the full
data: [DONE]\n\nmarker wheneverw.sawCompletedis true andw.sawDoneis false. If the upstream already ended with a partial prefix likedata: [DO, the client receives...data: [DOdata: [DONE]\n\n, which corrupts SSE framing instead of repairing it. Use the tracked tail overlap againstresponsesDoneMarkerand append/copy only the missing suffix, then add the EOF-after-partial regression test.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/providers/responses_done_wrapper.go` around lines 59 - 82, When w.sawCompleted is true but w.sawDone is false you must not always write the full responsesDoneMarker; instead compute how many bytes of the marker are already present at the tail (overlap) and only append/copy the missing suffix into p or w.pending. Update the branch that currently writes the full responsesDoneMarker (the block using responsesDoneMarker, w.pending, copy(p, ...), and references to w.sawCompleted/w.sawDone) to: calculate overlap = longest suffix of the already-emitted data that matches a prefix of responsesDoneMarker, then write responsesDoneMarker[overlap:] into p (or store the remainder in w.pending) and return the correct n. Add a regression test that feeds an upstream ending in a partial marker (e.g., "data: [DO") and asserts the client only receives the missing suffix to form "data: [DONE]\n\n" without duplicating the partial prefix.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/providers/responses_done_wrapper.go`:
- Around line 97-100: The current bytes.Contains check on combined
(append([]byte(nil), w.tail..., data...)) can falsely match JSON payloads;
instead parse combined into SSE lines and set w.sawDone only when you encounter
an SSE line whose content equals responsesDoneLine (i.e., the full "data:
[DONE]" line) on a line boundary. Implement this by splitting combined on
newline (or scanning lines), trimming trailing CR/LF, and checking each line for
exact match with responsesDoneLine (or "data: "+responsesDoneLine body) before
setting w.sawDone and updating w.tail accordingly; replace the bytes.Contains
usage with this line-based check in the code that reads w.tail and data.
---
Duplicate comments:
In `@internal/providers/responses_done_wrapper.go`:
- Around line 59-82: When w.sawCompleted is true but w.sawDone is false you must
not always write the full responsesDoneMarker; instead compute how many bytes of
the marker are already present at the tail (overlap) and only append/copy the
missing suffix into p or w.pending. Update the branch that currently writes the
full responsesDoneMarker (the block using responsesDoneMarker, w.pending,
copy(p, ...), and references to w.sawCompleted/w.sawDone) to: calculate overlap
= longest suffix of the already-emitted data that matches a prefix of
responsesDoneMarker, then write responsesDoneMarker[overlap:] into p (or store
the remainder in w.pending) and return the correct n. Add a regression test that
feeds an upstream ending in a partial marker (e.g., "data: [DO") and asserts the
client only receives the missing suffix to form "data: [DONE]\n\n" without
duplicating the partial prefix.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 93c704cc-b32b-4cfe-8dec-9f71afd604dc
📒 Files selected for processing (2)
internal/providers/responses_done_wrapper.gointernal/providers/responses_done_wrapper_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.github/workflows/test.yml:
- Around line 34-35: The workflow change sets verify: false for the
golangci-lint binary download which weakens supply-chain security; either
restore verification by providing the correct checksum (re-enable verify: true
and supply the expected checksum) or, if this is a temporary workaround, add an
inline comment next to verify: false explaining the why, the associated issue/PR
tracking re-enablement, and an expected re-enable date so the disablement is
auditable and can be reversed later.
In `@internal/server/handlers_test.go`:
- Around line 634-679: The test
TestChatCompletionStreaming_FlushesBeforeNextChunkArrives is flaky because it
asserts elapsed < secondChunkDelay/2; change it to use an explicit
synchronization signal from the test double instead of wall-clock timing: update
delayedChunkReadCloser (and/or streamingProviderWithCustomReader) to expose a
channel or notify function that is triggered when the first Read/chunk is
flushed, then in the test wait on that signal (with a reasonable timeout) to
confirm the first chunk arrived before proceeding to observe the delayed second
chunk; remove the elapsed-based assertion and rely on the channel notification
while keeping the rest of the handler invocation
(NewHandler(...).ChatCompletion) unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: b7b8fc52-0c3b-4274-8392-c505c4ca5cfa
📒 Files selected for processing (9)
.github/workflows/test.ymlinternal/providers/openai/openai.gointernal/providers/responses_done_wrapper.gointernal/providers/responses_done_wrapper_test.gointernal/providers/xai/xai.gointernal/server/handlers.gointernal/server/handlers_test.gotests/contract/testdata/golden/openai/responses_stream.golden.jsontests/contract/testdata/golden/xai/responses_stream.golden.json
Bug fix:
[DONE]is lost before delivery (C-06). Users see frozen output followed by bursts. This is the defining failure mode for an AI gateway.Summary by CodeRabbit
Bug Fixes
Documentation
Tests