refactor: share SSE parsing across audit and usage observers#158
refactor: share SSE parsing across audit and usage observers#158SantiagoDePolonia merged 11 commits intomainfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplaces inline SSE stream wrappers with an observer-based SSE stream: adds Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Observed as ObservedSSEStream
participant Underlying as IO.ReadCloser
participant Audit as StreamLogObserver
participant Usage as StreamUsageObserver
Client->>Observed: Read()
Observed->>Underlying: Read(chunk)
Underlying-->>Observed: chunk (SSE lines / partial)
Observed->>Observed: buffer + parse complete events
Observed->>Audit: OnJSONEvent(payload)
Observed->>Usage: OnJSONEvent(payload)
Note right of Observed: repeat for each chunk/event
Client->>Observed: Close()
Observed->>Observed: flush pending buffer, parse final events
Observed->>Audit: OnStreamClose()
Observed->>Usage: OnStreamClose()
Observed->>Underlying: Close()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/auditlog/stream_observer.go`:
- Around line 88-89: The code takes created from event as a float64 and casts to
int64 which can lose precision for very large timestamps; switch JSON decoding
to use json.Decoder.UseNumber (or ensure the unmarshalling produces json.Number)
and here detect json.Number in event["created"], calling .Int64() (or
strconv.ParseInt on its String()) to safely convert to int64 before assigning to
o.builder.Created; alternatively, if event may contain a string timestamp,
handle that case by parsing with strconv.ParseInt — update the conversion logic
around the created variable and the assignment to o.builder.Created accordingly.
In `@internal/streaming/observed_sse_stream_test.go`:
- Around line 1-86: Add three unit tests in
internal/streaming/observed_sse_stream_test.go: (1) a test that writes an SSE
stream containing a malformed JSON data line plus valid lines and asserts the
malformed line is skipped (no panic, eventCount excludes malformed), referencing
NewObservedSSEStream and trackingObserver to observe behavior; (2) a test that
calls NewObservedSSEStream with zero observers and asserts the returned
io.ReadCloser behaves like the original stream (ReadAll returns identical data
and Close forwards without altering data); and (3) a test that verifies the
"[DONE]" sentinel is handled correctly (treated as end-of-stream and does not
count as a JSON event) by creating a stream with a final "data: [DONE]" line and
checking trackingObserver.eventCount and lastID accordingly.
In `@internal/streaming/observed_sse_stream.go`:
- Around line 110-116: The code currently unmarshals JSON into a single payload
map and passes the same map to every observer via observer.OnJSONEvent, which
risks observers mutating shared state; update observed_sse_stream.go to either
(A) explicitly defensive-copy the payload before calling each observer (clone
the map and any nested maps/slices) so each observer receives an independent
map, or (B) strengthen the read-only contract by adding a clear comment near the
payload/unmarshal block and in the OnJSONEvent interface/documentation stating
that implementations must not mutate the payload; choose defensive copying if
you want safety regardless of future observers, and reference the payload
variable, the OnJSONEvent call site and s.observers when making the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 8a85921b-879a-40cb-8a6a-9467a91e312c
📒 Files selected for processing (8)
internal/auditlog/stream_observer.gointernal/auditlog/stream_wrapper.gointernal/server/translated_inference_service.gointernal/streaming/observed_sse_stream.gointernal/streaming/observed_sse_stream_test.gointernal/usage/stream_observer.gointernal/usage/stream_wrapper.gotests/perf/hotpath_test.go
| if created, ok := event["created"].(float64); ok { | ||
| o.builder.Created = int64(created) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider precision loss for large Unix timestamps.
JSON numbers unmarshal as float64, which can represent integers exactly only up to 2^53. While current Unix timestamps (~1.7×10^9) are safe, the cast int64(created) would lose precision for nanosecond timestamps or far-future dates. This is likely fine for the created field (seconds since epoch), but worth noting if this pattern is reused elsewhere.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/auditlog/stream_observer.go` around lines 88 - 89, The code takes
created from event as a float64 and casts to int64 which can lose precision for
very large timestamps; switch JSON decoding to use json.Decoder.UseNumber (or
ensure the unmarshalling produces json.Number) and here detect json.Number in
event["created"], calling .Int64() (or strconv.ParseInt on its String()) to
safely convert to int64 before assigning to o.builder.Created; alternatively, if
event may contain a string timestamp, handle that case by parsing with
strconv.ParseInt — update the conversion logic around the created variable and
the assignment to o.builder.Created accordingly.
| package streaming | ||
|
|
||
| import ( | ||
| "io" | ||
| "strings" | ||
| "testing" | ||
| ) | ||
|
|
||
| type trackingObserver struct { | ||
| eventCount int | ||
| lastID string | ||
| closed bool | ||
| } | ||
|
|
||
| func (o *trackingObserver) OnJSONEvent(payload map[string]interface{}) { | ||
| o.eventCount++ | ||
| if id, _ := payload["id"].(string); id != "" { | ||
| o.lastID = id | ||
| } | ||
| } | ||
|
|
||
| func (o *trackingObserver) OnStreamClose() { | ||
| o.closed = true | ||
| } | ||
|
|
||
| func TestObservedSSEStream_PassesThroughAndFansOut(t *testing.T) { | ||
| streamData := `data: {"id":"chatcmpl-1","choices":[{"delta":{"content":"hi"}}]} | ||
|
|
||
| data: {"id":"chatcmpl-2","usage":{"total_tokens":3}} | ||
|
|
||
| data: [DONE] | ||
|
|
||
| ` | ||
| first := &trackingObserver{} | ||
| second := &trackingObserver{} | ||
| stream := NewObservedSSEStream(io.NopCloser(strings.NewReader(streamData)), first, second) | ||
|
|
||
| data, err := io.ReadAll(stream) | ||
| if err != nil { | ||
| t.Fatalf("ReadAll error: %v", err) | ||
| } | ||
| if string(data) != streamData { | ||
| t.Fatalf("stream passthrough mismatch") | ||
| } | ||
| if err := stream.Close(); err != nil { | ||
| t.Fatalf("Close error: %v", err) | ||
| } | ||
|
|
||
| for i, observer := range []*trackingObserver{first, second} { | ||
| if observer.eventCount != 2 { | ||
| t.Fatalf("observer %d eventCount = %d, want 2", i, observer.eventCount) | ||
| } | ||
| if observer.lastID != "chatcmpl-2" { | ||
| t.Fatalf("observer %d lastID = %q, want chatcmpl-2", i, observer.lastID) | ||
| } | ||
| if !observer.closed { | ||
| t.Fatalf("observer %d was not closed", i) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func TestObservedSSEStream_ParsesFragmentedFinalEventOnClose(t *testing.T) { | ||
| streamData := `data: {"id":"chatcmpl-frag","usage":{"total_tokens":8}}` | ||
| observer := &trackingObserver{} | ||
| stream := NewObservedSSEStream(io.NopCloser(strings.NewReader(streamData)), observer) | ||
|
|
||
| data, err := io.ReadAll(stream) | ||
| if err != nil { | ||
| t.Fatalf("ReadAll error: %v", err) | ||
| } | ||
| if string(data) != streamData { | ||
| t.Fatalf("stream passthrough mismatch") | ||
| } | ||
| if err := stream.Close(); err != nil { | ||
| t.Fatalf("Close error: %v", err) | ||
| } | ||
| if observer.eventCount != 1 { | ||
| t.Fatalf("eventCount = %d, want 1", observer.eventCount) | ||
| } | ||
| if observer.lastID != "chatcmpl-frag" { | ||
| t.Fatalf("lastID = %q, want chatcmpl-frag", observer.lastID) | ||
| } | ||
| if !observer.closed { | ||
| t.Fatal("observer was not closed") | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider adding negative test cases.
The current tests cover happy paths well. Consider adding tests for:
- Malformed JSON data lines (should be silently skipped per implementation)
- Empty observer list (should return original stream)
[DONE]sentinel handling
These would strengthen confidence in the error handling paths.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/streaming/observed_sse_stream_test.go` around lines 1 - 86, Add
three unit tests in internal/streaming/observed_sse_stream_test.go: (1) a test
that writes an SSE stream containing a malformed JSON data line plus valid lines
and asserts the malformed line is skipped (no panic, eventCount excludes
malformed), referencing NewObservedSSEStream and trackingObserver to observe
behavior; (2) a test that calls NewObservedSSEStream with zero observers and
asserts the returned io.ReadCloser behaves like the original stream (ReadAll
returns identical data and Close forwards without altering data); and (3) a test
that verifies the "[DONE]" sentinel is handled correctly (treated as
end-of-stream and does not count as a JSON event) by creating a stream with a
final "data: [DONE]" line and checking trackingObserver.eventCount and lastID
accordingly.
| var payload map[string]interface{} | ||
| if err := json.Unmarshal(jsonData, &payload); err != nil { | ||
| continue | ||
| } | ||
| for _, observer := range s.observers { | ||
| observer.OnJSONEvent(payload) | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Shared payload map across observers—document the read-only contract.
The same payload map is passed to all observers (line 115). While the comment on line 12 states "Implementations must treat the payload as read-only," there's no enforcement. If an observer mutates the map, subsequent observers will see the mutation.
Current implementations (StreamLogObserver, StreamUsageObserver) only read from the payload, so this is safe today. Consider defensive copying only if mutation becomes a risk in future observers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/streaming/observed_sse_stream.go` around lines 110 - 116, The code
currently unmarshals JSON into a single payload map and passes the same map to
every observer via observer.OnJSONEvent, which risks observers mutating shared
state; update observed_sse_stream.go to either (A) explicitly defensive-copy the
payload before calling each observer (clone the map and any nested maps/slices)
so each observer receives an independent map, or (B) strengthen the read-only
contract by adding a clear comment near the payload/unmarshal block and in the
OnJSONEvent interface/documentation stating that implementations must not mutate
the payload; choose defensive copying if you want safety regardless of future
observers, and reference the payload variable, the OnJSONEvent call site and
s.observers when making the change.
5f17ccf to
49ef852
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/auditlog/stream_observer.go`:
- Around line 141-143: The code drops extra chunks once o.builder.contentLen >=
MaxContentCapture but doesn't set o.builder.truncated, causing
ResponseBodyTooBigToHandle to be incorrect; update the early-return branches
that check "if o.builder == nil || o.builder.truncated || o.builder.contentLen
>= MaxContentCapture" (and the similar branch in the 145-152 region) to set
o.builder.truncated = true before returning when contentLen >= MaxContentCapture
(and ensure you check builder != nil first), so the truncated flag accurately
reflects that content was dropped.
- Around line 81-91: The code currently only assigns o.builder.Model and
o.builder.Created when o.builder.ID == "" which can leave Model/Created unset if
ID was present first; change the logic so extracting "model" and "created" from
the event map is done independently of the ID check: keep the existing ID
assignment guarded by if o.builder.ID == "" and the id type assertion, but move
the model and created parsing outside that block (or into their own if checks)
so that if model (string) or created (float64) exist in event they are assigned
to o.builder.Model and o.builder.Created regardless of o.builder.ID.
In `@internal/streaming/observed_sse_stream.go`:
- Around line 120-127: savePending currently trims the start of an unterminated
frame when s.pending+data would exceed maxPendingEventBytes, silently corrupting
events; change savePending to avoid silent truncation by detecting when
len(s.pending)+len(data) > maxPendingEventBytes and instead drop the entire
in-flight event (clear s.pending) and record/mark the drop (e.g. set a
pendingTruncated flag or increment a metric) and emit a warning via your stream
logger so the dropped oversized event is visible; keep references to
savePending, ObservedSSEStream, maxPendingEventBytes, and s.pending when making
this change.
- Around line 78-106: The parser currently only looks for LF-only boundaries and
"data: " (with a space), causing CRLF-separated SSE and "data:"-without-space
frames to be ignored; update the boundary-detection and data-line handling in
the main read loop, processBufferedEvents, and processEvent to accept both
"\n\n" and "\r\n\r\n" (returning the correct separator length) and to accept
"data:" with or without a trailing space by trimming a single optional space
after the "data:" prefix before interpreting the payload (still treating the
payload "[DONE]" specially). Implement a helper like nextEventBoundary used by
the outer loop to find either LF or CRLF event separators and adjust slicing by
the separator length, and change processBufferedEvents to split on both boundary
forms or use the same helper to iterate events.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 2c72f1a4-00f8-4c69-b642-9866293ee49d
📒 Files selected for processing (10)
internal/auditlog/stream_observer.gointernal/auditlog/stream_wrapper.gointernal/server/translated_inference_service.gointernal/streaming/observed_sse_stream.gointernal/streaming/observed_sse_stream_test.gointernal/usage/constants.gointernal/usage/stream_observer.gointernal/usage/stream_observer_test.gointernal/usage/stream_wrapper.gotests/perf/hotpath_test.go
💤 Files with no reviewable changes (1)
- internal/usage/stream_wrapper.go
| if o.builder.ID == "" { | ||
| if id, ok := event["id"].(string); ok { | ||
| o.builder.ID = id | ||
| } | ||
| if model, ok := event["model"].(string); ok { | ||
| o.builder.Model = model | ||
| } | ||
| if created, ok := event["created"].(float64); ok { | ||
| o.builder.Created = int64(created) | ||
| } | ||
| } |
There was a problem hiding this comment.
Decouple model/created extraction from id initialization.
At Line 81, model and created are only parsed while ID is empty. If ID arrives before those fields, they can remain unset.
💡 Proposed fix
- if o.builder.ID == "" {
- if id, ok := event["id"].(string); ok {
- o.builder.ID = id
- }
- if model, ok := event["model"].(string); ok {
- o.builder.Model = model
- }
- if created, ok := event["created"].(float64); ok {
- o.builder.Created = int64(created)
- }
- }
+ if o.builder.ID == "" {
+ if id, ok := event["id"].(string); ok {
+ o.builder.ID = id
+ }
+ }
+ if o.builder.Model == "" {
+ if model, ok := event["model"].(string); ok {
+ o.builder.Model = model
+ }
+ }
+ if o.builder.Created == 0 {
+ if created, ok := event["created"].(float64); ok {
+ o.builder.Created = int64(created)
+ }
+ }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/auditlog/stream_observer.go` around lines 81 - 91, The code
currently only assigns o.builder.Model and o.builder.Created when o.builder.ID
== "" which can leave Model/Created unset if ID was present first; change the
logic so extracting "model" and "created" from the event map is done
independently of the ID check: keep the existing ID assignment guarded by if
o.builder.ID == "" and the id type assertion, but move the model and created
parsing outside that block (or into their own if checks) so that if model
(string) or created (float64) exist in event they are assigned to
o.builder.Model and o.builder.Created regardless of o.builder.ID.
| if o.builder == nil || o.builder.truncated || o.builder.contentLen >= MaxContentCapture { | ||
| return | ||
| } |
There was a problem hiding this comment.
Set truncation flag when content is dropped at capacity.
At Line 141, extra chunks are dropped once capacity is reached, but truncated may remain false. That can make ResponseBodyTooBigToHandle incorrect on close.
💡 Proposed fix
func (o *StreamLogObserver) appendContent(content string) {
- if o.builder == nil || o.builder.truncated || o.builder.contentLen >= MaxContentCapture {
+ if o.builder == nil || o.builder.truncated || content == "" {
+ return
+ }
+ if o.builder.contentLen >= MaxContentCapture {
+ o.builder.truncated = true
return
}
remaining := MaxContentCapture - o.builder.contentLen
if len(content) > remaining {
content = content[:remaining]
o.builder.truncated = true
}
o.builder.Content.WriteString(content)
o.builder.contentLen += len(content)
}Also applies to: 145-152
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/auditlog/stream_observer.go` around lines 141 - 143, The code drops
extra chunks once o.builder.contentLen >= MaxContentCapture but doesn't set
o.builder.truncated, causing ResponseBodyTooBigToHandle to be incorrect; update
the early-return branches that check "if o.builder == nil || o.builder.truncated
|| o.builder.contentLen >= MaxContentCapture" (and the similar branch in the
145-152 region) to set o.builder.truncated = true before returning when
contentLen >= MaxContentCapture (and ensure you check builder != nil first), so
the truncated flag accurately reflects that content was dropped.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/auditlog/auditlog_test.go (1)
709-761:⚠️ Potential issue | 🟡 MinorMake the observer test flush deterministically.
This test still waits on a fixed sleep before asserting the async write. That makes it timing-sensitive on slower CI. Flush the logger explicitly before the assertion, or poll with
require.Eventually, instead of depending on200msbeing enough.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/auditlog/auditlog_test.go` around lines 709 - 761, The test TestStreamLogObserver is using a fixed sleep to wait for the async write; make it deterministic by replacing the time.Sleep with an explicit flush or a bounded poll: after observedStream.Close() call logger.Flush() (or logger.Close() if Flush() isn't available) to force the buffered write, or use require.Eventually to poll store.getEntries() until len(...) == 1; update the code paths around NewStreamLogObserver/observedStream.Close() and the assertion that inspects store.getEntries() so the test no longer relies on a fixed 200ms sleep.
♻️ Duplicate comments (1)
internal/streaming/observed_sse_stream.go (1)
77-95:⚠️ Potential issue | 🟠 MajorDrop oversized in-flight frames explicitly instead of keeping only the suffix.
The truncation in
processChunk/savePendingdiscards the beginning of an unfinished event. That guarantees the eventual JSON is malformed, and the pre-combine trim can also skip complete events that were at the head of a large read. Once that happens, both audit and usage observers silently miss data. Prefer entering a discard-until-next-boundary state for oversized frames instead of tail-capping the buffer.Also applies to: 174-181
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/streaming/observed_sse_stream.go` around lines 77 - 95, The current truncation in savePending/processChunk that takes only the suffix (using pending[pendingLen-maxPendingEventBytes:] and similar for data) corrupts JSON and drops whole events; change this to enter a discard-until-next-boundary mode instead: when len(s.pending) or len(data) exceeds maxPendingEventBytes, set s.pending = nil and set a new boolean flag (e.g., s.discarding = true) to indicate we should drop bytes until we detect an event boundary, and ensure processChunk clears s.discarding when a complete event boundary is seen; apply the same change in both places where suffix-trimming occurs (the block around pending/data truncation at the shown code and the similar logic at lines ~174-181) and remove the suffix-copy logic so oversized frames are explicitly dropped rather than tail-capped.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/server/passthrough_support.go`:
- Around line 253-257: The SSE passthrough currently only attaches the audit
observer; update the observer assembly so it mirrors
translated_inference_service.go by also creating and appending the usage
observer before constructing streaming.NewObservedSSEStream: keep the existing
streaming.Observer slice and auditlog.NewStreamLogObserver(s.logger,
streamEntry, passthroughAuditPath(...)), then call the usage observer factory
(e.g., usage.NewStreamUsageObserver or the equivalent usage observer constructor
used in translated_inference_service.go with the same streamEntry and the
passthrough usage path) and append it when non-nil so both observers are passed
into streaming.NewObservedSSEStream.
In `@internal/streaming/observed_sse_stream.go`:
- Around line 124-142: processEvent currently treats each data line separately
so multi-line SSE payloads are not reassembled; update processEvent in
ObservedSSEStream to collect consecutive parseDataLine results for a single
event (join them with "\n"), skip donePayload as before, then json.Unmarshal the
reassembled JSON and deliver to each observer via observer.OnJSONEvent; also add
a regression test that sends a JSON object split across two data: lines and
asserts the observer receives the reconstructed object. Ensure you reference
parseDataLine, donePayload, processEvent, ObservedSSEStream and the observers
slice when making changes.
---
Outside diff comments:
In `@internal/auditlog/auditlog_test.go`:
- Around line 709-761: The test TestStreamLogObserver is using a fixed sleep to
wait for the async write; make it deterministic by replacing the time.Sleep with
an explicit flush or a bounded poll: after observedStream.Close() call
logger.Flush() (or logger.Close() if Flush() isn't available) to force the
buffered write, or use require.Eventually to poll store.getEntries() until
len(...) == 1; update the code paths around
NewStreamLogObserver/observedStream.Close() and the assertion that inspects
store.getEntries() so the test no longer relies on a fixed 200ms sleep.
---
Duplicate comments:
In `@internal/streaming/observed_sse_stream.go`:
- Around line 77-95: The current truncation in savePending/processChunk that
takes only the suffix (using pending[pendingLen-maxPendingEventBytes:] and
similar for data) corrupts JSON and drops whole events; change this to enter a
discard-until-next-boundary mode instead: when len(s.pending) or len(data)
exceeds maxPendingEventBytes, set s.pending = nil and set a new boolean flag
(e.g., s.discarding = true) to indicate we should drop bytes until we detect an
event boundary, and ensure processChunk clears s.discarding when a complete
event boundary is seen; apply the same change in both places where
suffix-trimming occurs (the block around pending/data truncation at the shown
code and the similar logic at lines ~174-181) and remove the suffix-copy logic
so oversized frames are explicitly dropped rather than tail-capped.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f183f3a6-d880-469a-b0f5-86139ed3816f
📒 Files selected for processing (9)
internal/auditlog/auditlog_test.gointernal/auditlog/constants.gointernal/auditlog/middleware.gointernal/auditlog/stream_wrapper.gointernal/server/handlers.gointernal/server/handlers_test.gointernal/server/passthrough_support.gointernal/streaming/observed_sse_stream.gointernal/streaming/observed_sse_stream_test.go
💤 Files with no reviewable changes (1)
- internal/server/handlers.go
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/server/passthrough_support.go`:
- Around line 254-268: The code currently passes auditPath (from
passthroughAuditPath) into usage.NewStreamUsageObserver which causes passthrough
traffic to be logged as normalized /v1/* routes; instead compute and pass the
client-visible route (e.g., c.Request().URL.Path or the original incoming path)
into usage.NewStreamUsageObserver while keeping auditPath for
auditlog.NewStreamLogObserver; update the call to
usage.NewStreamUsageObserver(model, providerType, requestID,
<clientVisiblePath>, s.pricingResolver) so usage logging receives the
unnormalized client route (leave passthroughAuditPath and audit logging
unchanged).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9d2fbbe9-43f0-4413-b7f0-9d4eb76c14e4
📒 Files selected for processing (7)
internal/auditlog/auditlog_test.gointernal/server/handlers.gointernal/server/handlers_test.gointernal/server/passthrough_service.gointernal/server/passthrough_support.gointernal/streaming/observed_sse_stream.gointernal/streaming/observed_sse_stream_test.go
| func (s *ObservedSSEStream) processChunk(data []byte) { | ||
| if len(s.pending) > 0 { | ||
| idx, sepLen := nextEventBoundary(data) | ||
| if idx == -1 { | ||
| if len(data) > maxPendingEventBytes || len(s.pending) > maxPendingEventBytes-len(data) { | ||
| s.pending = nil | ||
| s.discarding = true | ||
| return | ||
| } | ||
|
|
||
| combinedLen := len(s.pending) + len(data) | ||
| combined := make([]byte, combinedLen) | ||
| copy(combined, s.pending) | ||
| copy(combined[len(s.pending):], data) | ||
| s.pending = combined | ||
| return | ||
| } | ||
|
|
||
| if idx > maxPendingEventBytes || len(s.pending) > maxPendingEventBytes-idx { | ||
| s.pending = nil | ||
| data = data[idx+sepLen:] | ||
| } else { | ||
| combinedLen := len(s.pending) + idx | ||
| event := make([]byte, combinedLen) | ||
| copy(event, s.pending) | ||
| copy(event[len(s.pending):], data[:idx]) | ||
| s.pending = nil | ||
| s.processEvent(event) | ||
| data = data[idx+sepLen:] | ||
| } | ||
| } | ||
|
|
||
| for len(data) > 0 { | ||
| if s.discarding { | ||
| idx, sepLen := nextEventBoundary(data) | ||
| if idx == -1 { | ||
| return | ||
| } | ||
| data = data[idx+sepLen:] | ||
| s.discarding = false |
There was a problem hiding this comment.
Handle SSE delimiters that straddle two reads.
Both the pending and discarding branches only search data for \n\n / \r\n\r\n. If one read ends on the first newline(s) and the next read starts with the remainder of the separator, the boundary is missed, adjacent events get glued together, and processEvent drops them as invalid JSON. Please detect boundaries across the chunk join and add a regression test where the split happens exactly inside the delimiter. As per coding guidelines "Be liberal in what you accept from providers' responses and pass them to the user in a conservative OpenAI-compatible shape".
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/server/passthrough_support.go`:
- Around line 266-278: NewObservedSSEStream currently returns resp.Body
unchanged when observers is empty, so the unconditional defer _ =
wrappedStream.Close() causes a double-close; change the defer to only close when
we actually wrapped the body — e.g., after building observers and creating
wrappedStream via streaming.NewObservedSSEStream(resp.Body, observers...), only
call defer wrappedStream.Close() when len(observers) > 0 (or otherwise when
wrappedStream != resp.Body) so we don't add a second Close for the passthrough
case (references: observers, NewObservedSSEStream, wrappedStream, resp.Body).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a5453464-95fb-428a-8ede-ac472635dafa
📒 Files selected for processing (4)
internal/server/handlers_test.gointernal/server/passthrough_support.gointernal/streaming/observed_sse_stream.gointernal/streaming/observed_sse_stream_test.go
Summary
Testing
Summary by CodeRabbit
Improvements
Refactor
Tests