Skip to content

Fix: SSE flushing and responses done markers#117

Merged
SantiagoDePolonia merged 8 commits intomainfrom
fix/sse-flush-and-done
Mar 6, 2026
Merged

Fix: SSE flushing and responses done markers#117
SantiagoDePolonia merged 8 commits intomainfrom
fix/sse-flush-and-done

Conversation

@SantiagoDePolonia
Copy link
Copy Markdown
Contributor

@SantiagoDePolonia SantiagoDePolonia commented Mar 6, 2026

Bug fix:

  • Streaming is broken. SSE tokens buffer at 4KB instead of flushing incrementally (C-04). The stream termination signal [DONE] is lost before delivery (C-06). Users see frozen output followed by bursts. This is the defining failure mode for an AI gateway.

Summary by CodeRabbit

  • Bug Fixes

    • Streamed responses now reliably end with the terminal data: [DONE] marker and better handle upstream EOFs and streaming errors to avoid truncated outputs.
    • Improved incremental delivery to clients with buffered flush behavior for smoother real-time streaming.
  • Documentation

    • Clarified streaming semantics in Getting Started and Tips: SSE chunks flush incrementally and streams terminate with data: [DONE].
  • Tests

    • Added tests for marker handling, chunked/split markers, flushing, timing, and streaming error scenarios.

@SantiagoDePolonia SantiagoDePolonia self-assigned this Mar 6, 2026
@SantiagoDePolonia SantiagoDePolonia marked this pull request as ready for review March 6, 2026 14:09
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 6, 2026

Warning

Rate limit exceeded

@SantiagoDePolonia has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 8 minutes and 0 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: b66d79b2-ec57-45ea-ac67-0e2cec3c60cc

📥 Commits

Reviewing files that changed from the base of the PR and between 12e8aad and 1480f9b.

📒 Files selected for processing (2)
  • .github/workflows/test.yml
  • internal/server/handlers_test.go
📝 Walkthrough

Walkthrough

Adds a streaming-normalization wrapper EnsureResponsesDone that guarantees a trailing data: [DONE] marker for Responses-style SSE streams, updates provider streaming paths to use the wrapper, replaces direct io.Copy with a buffered flushStream in handlers, and adds tests and docs reflecting explicit streaming semantics.

Changes

Cohort / File(s) Summary
Docs
GETTING_STARTED.md
Clarifies streaming semantics: SSE chunks are flushed incrementally and streams terminate with data: [DONE].
Responses wrapper & tests
internal/providers/responses_done_wrapper.go, internal/providers/responses_done_wrapper_test.go
New EnsureResponsesDone(io.ReadCloser) that detects/ensures a terminal data: [DONE] across chunk boundaries; tests cover missing, present, and split markers.
Provider integrations
internal/providers/openai/openai.go, internal/providers/xai/xai.go
StreamResponses now returns streams wrapped with providers.EnsureResponsesDone and surfaces DoStream errors before wrapping.
Server handlers & tests
internal/server/handlers.go, internal/server/handlers_test.go
Replaces direct io.Copy with private flushStream that reads 32KB chunks and flushes after each write; adds streaming test helpers and tests for flush/error/audit behavior.
Provider tests & golden fixtures
internal/providers/openai/openai_test.go, internal/providers/xai/xai_test.go, tests/contract/testdata/golden/.../responses_stream.golden.json
Minor formatting test tweaks; xai test expectation updated to rely on wrapper for final data: [DONE]; contract golden fixtures add explicit terminal Done event.
CI lint tweak
.github/workflows/test.yml
Set verify: false on golangci-lint-action step.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Handler as Server Handler
    participant Provider
    participant Wrapper as EnsureResponsesDone (io.ReadCloser)
    participant Upstream as Upstream Stream

    Client->>Handler: Request streaming response
    Handler->>Provider: StreamChatCompletion(ctx, req)
    Provider->>Upstream: open upstream SSE stream
    Provider->>Wrapper: EnsureResponsesDone(upstream)
    Wrapper-->>Provider: wrapped stream
    Provider-->>Handler: wrapped stream

    Handler->>Handler: flushStream(responseWriter, wrappedStream)
    loop per-chunk
        Handler->>Wrapper: Read(buf)
        Wrapper->>Upstream: Read(buf)
        Upstream-->>Wrapper: chunk
        Wrapper->>Wrapper: detect/queue DONE marker if needed
        Wrapper-->>Handler: chunk (may include appended `data: [DONE]`)
        Handler->>Client: Write chunk + Flush
    end

    Handler->>Wrapper: Close()
    Wrapper->>Upstream: Close()
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐇
I nibble bytes from chunk to chunk,
I stitch the stream when edges are frayed,
One tidy marker, tidy and done,
Hops of flush and tests well-played,
Happy rabbit, streaming made!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 4.35% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Fix: SSE flushing and responses done markers' directly and clearly summarizes the main changes: implementing incremental SSE flushing and ensuring [DONE] termination markers are delivered to clients.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/sse-flush-and-done

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/providers/openai/openai.go`:
- Around line 190-201: The exported doc comment for StreamResponses is outdated
because the function no longer returns the provider's raw body verbatim; update
the comment on the StreamResponses function to explicitly state that the
returned io.ReadCloser is a normalized stream (produced by
providers.EnsureResponsesDone) which may synthesize a terminal "data: [DONE]"
marker and should not be assumed to contain raw upstream bytes, and mention that
callers must still close the returned ReadCloser.

In `@internal/providers/responses_done_wrapper.go`:
- Around line 50-70: The current EOF handling in responsesDoneWrapper appends
the full responsesDoneMarker when a partial prefix exists, causing duplicated
fragments; implement a donePrefixOverlap() helper (as suggested) that checks
overlap between w.tail and the start of responsesDoneMarker, then on EOF when
n>0 append only the missing suffix responsesDoneMarker[overlap:] to w.pending
(instead of the whole marker), and when n==0 copy only the missing suffix into p
or set w.pending to the remaining suffix; update logic around w.pending, w.tail,
w.sawDone and w.emitted to use the overlap value so we complete only the missing
bytes and preserve correct SSE framing.

In `@internal/providers/xai/xai.go`:
- Around line 130-141: Update the exported doc comment for StreamResponses to
reflect the new stream semantics: state that StreamResponses does not return the
provider's raw response bytes but a normalized stream (via
providers.EnsureResponsesDone) which may synthesize a terminal "data: [DONE]"
marker and that callers must not assume verbatim upstream bytes and remain
responsible for closing the returned io.ReadCloser; reference the
StreamResponses function and providers.EnsureResponsesDone in the comment.

In `@internal/server/handlers.go`:
- Around line 109-135: flushStream currently swallows all stream/read/write
errors; change its signature to return an error and propagate read/write
failures (return non-nil for non-EOF read errors and for any write errors) while
treating io.EOF as nil so callers can tell normal completion from abnormal
termination; update the caller handleStreamingResponse (the place that invokes
flushStream with wrappedStream and c.Response().Writer) to check the returned
error and record/log/audit abnormal terminations so the missing terminal marker
is detected and surfaced.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 30395d03-1451-4b7d-9ade-1a43482d1d78

📥 Commits

Reviewing files that changed from the base of the PR and between 46e3f34 and 654fe6c.

📒 Files selected for processing (9)
  • GETTING_STARTED.md
  • internal/providers/openai/openai.go
  • internal/providers/openai/openai_test.go
  • internal/providers/responses_done_wrapper.go
  • internal/providers/responses_done_wrapper_test.go
  • internal/providers/xai/xai.go
  • internal/providers/xai/xai_test.go
  • internal/server/handlers.go
  • internal/server/handlers_test.go
💤 Files with no reviewable changes (1)
  • internal/providers/xai/xai_test.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
internal/providers/responses_done_wrapper.go (1)

59-82: ⚠️ Potential issue | 🟠 Major

Complete only the missing [DONE] suffix after a partial marker.

Line 75 / Line 79 still emits the full data: [DONE]\n\n marker whenever w.sawCompleted is true and w.sawDone is false. If the upstream already ended with a partial prefix like data: [DO, the client receives ...data: [DOdata: [DONE]\n\n, which corrupts SSE framing instead of repairing it. Use the tracked tail overlap against responsesDoneMarker and append/copy only the missing suffix, then add the EOF-after-partial regression test.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/providers/responses_done_wrapper.go` around lines 59 - 82, When
w.sawCompleted is true but w.sawDone is false you must not always write the full
responsesDoneMarker; instead compute how many bytes of the marker are already
present at the tail (overlap) and only append/copy the missing suffix into p or
w.pending. Update the branch that currently writes the full responsesDoneMarker
(the block using responsesDoneMarker, w.pending, copy(p, ...), and references to
w.sawCompleted/w.sawDone) to: calculate overlap = longest suffix of the
already-emitted data that matches a prefix of responsesDoneMarker, then write
responsesDoneMarker[overlap:] into p (or store the remainder in w.pending) and
return the correct n. Add a regression test that feeds an upstream ending in a
partial marker (e.g., "data: [DO") and asserts the client only receives the
missing suffix to form "data: [DONE]\n\n" without duplicating the partial
prefix.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@internal/providers/responses_done_wrapper.go`:
- Around line 97-100: The current bytes.Contains check on combined
(append([]byte(nil), w.tail..., data...)) can falsely match JSON payloads;
instead parse combined into SSE lines and set w.sawDone only when you encounter
an SSE line whose content equals responsesDoneLine (i.e., the full "data:
[DONE]" line) on a line boundary. Implement this by splitting combined on
newline (or scanning lines), trimming trailing CR/LF, and checking each line for
exact match with responsesDoneLine (or "data: "+responsesDoneLine body) before
setting w.sawDone and updating w.tail accordingly; replace the bytes.Contains
usage with this line-based check in the code that reads w.tail and data.

---

Duplicate comments:
In `@internal/providers/responses_done_wrapper.go`:
- Around line 59-82: When w.sawCompleted is true but w.sawDone is false you must
not always write the full responsesDoneMarker; instead compute how many bytes of
the marker are already present at the tail (overlap) and only append/copy the
missing suffix into p or w.pending. Update the branch that currently writes the
full responsesDoneMarker (the block using responsesDoneMarker, w.pending,
copy(p, ...), and references to w.sawCompleted/w.sawDone) to: calculate overlap
= longest suffix of the already-emitted data that matches a prefix of
responsesDoneMarker, then write responsesDoneMarker[overlap:] into p (or store
the remainder in w.pending) and return the correct n. Add a regression test that
feeds an upstream ending in a partial marker (e.g., "data: [DO") and asserts the
client only receives the missing suffix to form "data: [DONE]\n\n" without
duplicating the partial prefix.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 93c704cc-b32b-4cfe-8dec-9f71afd604dc

📥 Commits

Reviewing files that changed from the base of the PR and between 654fe6c and 366f8d1.

📒 Files selected for processing (2)
  • internal/providers/responses_done_wrapper.go
  • internal/providers/responses_done_wrapper_test.go

@SantiagoDePolonia
Copy link
Copy Markdown
Contributor Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 6, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.github/workflows/test.yml:
- Around line 34-35: The workflow change sets verify: false for the
golangci-lint binary download which weakens supply-chain security; either
restore verification by providing the correct checksum (re-enable verify: true
and supply the expected checksum) or, if this is a temporary workaround, add an
inline comment next to verify: false explaining the why, the associated issue/PR
tracking re-enablement, and an expected re-enable date so the disablement is
auditable and can be reversed later.

In `@internal/server/handlers_test.go`:
- Around line 634-679: The test
TestChatCompletionStreaming_FlushesBeforeNextChunkArrives is flaky because it
asserts elapsed < secondChunkDelay/2; change it to use an explicit
synchronization signal from the test double instead of wall-clock timing: update
delayedChunkReadCloser (and/or streamingProviderWithCustomReader) to expose a
channel or notify function that is triggered when the first Read/chunk is
flushed, then in the test wait on that signal (with a reasonable timeout) to
confirm the first chunk arrived before proceeding to observe the delayed second
chunk; remove the elapsed-based assertion and rely on the channel notification
while keeping the rest of the handler invocation
(NewHandler(...).ChatCompletion) unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: b7b8fc52-0c3b-4274-8392-c505c4ca5cfa

📥 Commits

Reviewing files that changed from the base of the PR and between 366f8d1 and 12e8aad.

📒 Files selected for processing (9)
  • .github/workflows/test.yml
  • internal/providers/openai/openai.go
  • internal/providers/responses_done_wrapper.go
  • internal/providers/responses_done_wrapper_test.go
  • internal/providers/xai/xai.go
  • internal/server/handlers.go
  • internal/server/handlers_test.go
  • tests/contract/testdata/golden/openai/responses_stream.golden.json
  • tests/contract/testdata/golden/xai/responses_stream.golden.json

@SantiagoDePolonia SantiagoDePolonia merged commit 8149ab0 into main Mar 6, 2026
13 checks passed
@SantiagoDePolonia SantiagoDePolonia deleted the fix/sse-flush-and-done branch March 22, 2026 14:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant