Conversation
Greptile SummaryThis PR fixes a panic (issue #1634) triggered when a client cancels a pass-through request: the old
Confidence Score: 5/5The change is safe to merge — it eliminates a documented data race and panic without altering the happy-path event flow. The root-cause fix (widening closeStream to also call stream.Close() and making defaultSSEDecoder.Close() concurrency-safe) is narrowly scoped and well-reasoned. The existing fan-out logic is preserved; only the cancellation path is widened. The new test directly exercises the fixed race condition. No behavioural change is visible on the success path. No files require special attention. Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant passThroughChannelStream
participant captureGoroutine as captureRawProviderStream goroutine
participant SSEDecoder as defaultSSEDecoder
Client->>passThroughChannelStream: Next() — blocks on rawStreamCh / ctx.Done()
Note over captureGoroutine,SSEDecoder: Goroutine blocked in stream.Next() → SSEDecoder.Recv()
Client->>Client: ctx cancelled
Client->>passThroughChannelStream: ctx.Done() fires in Next()
passThroughChannelStream->>passThroughChannelStream: Close() via sync.Once
passThroughChannelStream->>passThroughChannelStream: closeStream() via closeStreamOnce
passThroughChannelStream->>captureGoroutine: cancel(attemptCtx)
passThroughChannelStream->>SSEDecoder: stream.Close() — reader.Close()
SSEDecoder-->>captureGoroutine: Recv() unblocked (reader closed)
captureGoroutine->>captureGoroutine: stream.Next() returns false
captureGoroutine->>captureGoroutine: defer recover() (if panic)
captureGoroutine->>captureGoroutine: set rawStreamErr
captureGoroutine->>captureGoroutine: close(pipelineCh) + close(rawStreamCh)
passThroughChannelStream-->>Client: Next() returns false
Reviews (2): Last reviewed commit: "fix: prevent panic when client canceled ..." | Re-trigger Greptile |
There was a problem hiding this comment.
Code Review
This pull request introduces a mandatory panic recovery rule for goroutines and applies it to the pass-through and SSE decoder components. The changes improve concurrency safety and resource management by using sync.Once for idempotent cleanup and ensuring context cancellation correctly unblocks I/O operations. Feedback from the review suggests elevating the log level for recovered panics, removing a redundant mutex in the decoder's Close method, and verifying whether the explicit closure of sseStream is still necessary.
| func (s *defaultSSEDecoder) Next() (ret bool) { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| slog.DebugContext(s.ctx, "SSE stream panic recovered", slog.Any("panic", r)) |
There was a problem hiding this comment.
A panic in the SSE stream processing is a significant event that should be logged with a higher severity than Debug. Using ErrorContext or WarnContext would ensure that these unexpected failures are visible in production logs.
| slog.DebugContext(s.ctx, "SSE stream panic recovered", slog.Any("panic", r)) | |
| slog.ErrorContext(s.ctx, "SSE stream panic recovered", slog.Any("panic", r)) |
| s.closeMu.Lock() | ||
| defer s.closeMu.Unlock() |
There was a problem hiding this comment.
The closeMu mutex appears to be redundant here because sync.Once.Do already provides the necessary synchronization to ensure the body is executed exactly once and blocks concurrent callers until the first execution completes. Since s.closeErr is only written within the Do block and read after it, the mutex doesn't add any extra safety.
func (s *defaultSSEDecoder) Close() error {
s.closeOnce.Do(func() {| if s.reader != nil { | ||
| s.closeErr = s.reader.Close() | ||
| } |
There was a problem hiding this comment.
The previous implementation called s.sseStream.Close(). While closing the underlying reader is necessary to unblock I/O, sseStream (if it is an instance of sse.Stream) might have its own internal resources or state that should be cleaned up. Consider calling both s.sseStream.Close() and s.reader.Close(), or clarify if sseStream.Close() was intentionally removed because it was redundant or problematic.
No description provided.