Skip to content

fix: prevent panic when client canceled for pass-through, close #1634#1637

Merged
looplj merged 1 commit into
unstablefrom
dev-tmp
May 11, 2026
Merged

fix: prevent panic when client canceled for pass-through, close #1634#1637
looplj merged 1 commit into
unstablefrom
dev-tmp

Conversation

@looplj

@looplj looplj commented May 10, 2026

Copy link
Copy Markdown
Owner

No description provided.

@greptile-apps

greptile-apps Bot commented May 10, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes a panic (issue #1634) triggered when a client cancels a pass-through request: the old RawStreamCancel only cancelled the attempt context, leaving the upstream stream.Next() blocked and eventually racing with a concurrent goroutine. The fix widens closeStream to also call stream.Close(), which unblocks any in-flight Recv(), and makes defaultSSEDecoder.Close() concurrency-safe by using sync.Once + atomic.Bool and closing the underlying reader directly instead of sse.Stream.Close() (whose internal bool is unsynchronized).

  • pass_through.go: RawStreamCancel is replaced by a closeStream closure (sync.Once-guarded cancel + stream.Close()); the fan-out goroutine gains a top-level defer recover() and a pre-flight attemptCtx.Done() check; passThroughChannelStream.Close() now propagates the cancel back to the producer.
  • decoder.go: defaultSSEDecoder is refactored to be safe for concurrent Close()/Next() calls — closed field becomes atomic.Bool, close logic moves into sync.Once, and a detailed concurrency-model doc-comment is added.
  • pass_through_test.go: time.Sleep-based waits are replaced with channel-close happens-before barriers and require.Eventually; a new blockingStream helper and TestCaptureRawProviderStream_CloseStopsBlockedUpstream directly cover the fixed race.

Confidence Score: 5/5

The change is safe to merge — it eliminates a documented data race and panic without altering the happy-path event flow.

The root-cause fix (widening closeStream to also call stream.Close() and making defaultSSEDecoder.Close() concurrency-safe) is narrowly scoped and well-reasoned. The existing fan-out logic is preserved; only the cancellation path is widened. The new test directly exercises the fixed race condition. No behavioural change is visible on the success path.

No files require special attention.

Important Files Changed

Filename Overview
llm/httpclient/decoder.go Refactors defaultSSEDecoder to be concurrency-safe: replaces plain bool with atomic.Bool for the closed flag, swaps sse.Stream.Close() for direct reader.Close() to avoid touching sse.Stream's unsynchronized internal state, and uses sync.Once for idempotent close. Extensive doc-comment explains the concurrency model and design decisions clearly.
internal/server/orchestrator/pass_through.go Upgrades RawStreamCancel from a plain context.CancelFunc to a closeStream closure that also calls stream.Close(), ensuring the blocking upstream Next() is unblocked when the client cancels. Adds a top-level defer recover() to the fan-out goroutine and a pre-flight select on attemptCtx.Done() to avoid entering stream.Next() after cancellation.
internal/server/orchestrator/pass_through_test.go Adds TestCaptureRawProviderStream_CloseStopsBlockedUpstream (the key regression test), a blockingStream helper, and replaces fragile time.Sleep waits with channel-close happens-before barriers and require.Eventually. The eventCount accessor correctly uses the mutex to avoid a test race.
.agent/rules/go-general.md Adds rule #8 requiring a top-level defer recover() guard in every manually started goroutine.

Sequence Diagram

sequenceDiagram
    participant Client
    participant passThroughChannelStream
    participant captureGoroutine as captureRawProviderStream goroutine
    participant SSEDecoder as defaultSSEDecoder

    Client->>passThroughChannelStream: Next() — blocks on rawStreamCh / ctx.Done()
    Note over captureGoroutine,SSEDecoder: Goroutine blocked in stream.Next() → SSEDecoder.Recv()

    Client->>Client: ctx cancelled
    Client->>passThroughChannelStream: ctx.Done() fires in Next()
    passThroughChannelStream->>passThroughChannelStream: Close() via sync.Once
    passThroughChannelStream->>passThroughChannelStream: closeStream() via closeStreamOnce
    passThroughChannelStream->>captureGoroutine: cancel(attemptCtx)
    passThroughChannelStream->>SSEDecoder: stream.Close() — reader.Close()

    SSEDecoder-->>captureGoroutine: Recv() unblocked (reader closed)
    captureGoroutine->>captureGoroutine: stream.Next() returns false
    captureGoroutine->>captureGoroutine: defer recover() (if panic)
    captureGoroutine->>captureGoroutine: set rawStreamErr
    captureGoroutine->>captureGoroutine: close(pipelineCh) + close(rawStreamCh)

    passThroughChannelStream-->>Client: Next() returns false
Loading

Reviews (2): Last reviewed commit: "fix: prevent panic when client canceled ..." | Re-trigger Greptile

Comment thread llm/httpclient/decoder.go Outdated

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mandatory panic recovery rule for goroutines and applies it to the pass-through and SSE decoder components. The changes improve concurrency safety and resource management by using sync.Once for idempotent cleanup and ensuring context cancellation correctly unblocks I/O operations. Feedback from the review suggests elevating the log level for recovered panics, removing a redundant mutex in the decoder's Close method, and verifying whether the explicit closure of sseStream is still necessary.

Comment thread llm/httpclient/decoder.go Outdated
func (s *defaultSSEDecoder) Next() (ret bool) {
defer func() {
if r := recover(); r != nil {
slog.DebugContext(s.ctx, "SSE stream panic recovered", slog.Any("panic", r))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

A panic in the SSE stream processing is a significant event that should be logged with a higher severity than Debug. Using ErrorContext or WarnContext would ensure that these unexpected failures are visible in production logs.

Suggested change
slog.DebugContext(s.ctx, "SSE stream panic recovered", slog.Any("panic", r))
slog.ErrorContext(s.ctx, "SSE stream panic recovered", slog.Any("panic", r))

Comment thread llm/httpclient/decoder.go Outdated
Comment on lines +176 to +177
s.closeMu.Lock()
defer s.closeMu.Unlock()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The closeMu mutex appears to be redundant here because sync.Once.Do already provides the necessary synchronization to ensure the body is executed exactly once and blocks concurrent callers until the first execution completes. Since s.closeErr is only written within the Do block and read after it, the mutex doesn't add any extra safety.

func (s *defaultSSEDecoder) Close() error {
	s.closeOnce.Do(func() {

Comment thread llm/httpclient/decoder.go
Comment on lines +184 to +186
if s.reader != nil {
s.closeErr = s.reader.Close()
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The previous implementation called s.sseStream.Close(). While closing the underlying reader is necessary to unblock I/O, sseStream (if it is an instance of sse.Stream) might have its own internal resources or state that should be cleaned up. Consider calling both s.sseStream.Close() and s.reader.Close(), or clarify if sseStream.Close() was intentionally removed because it was redundant or problematic.

@looplj looplj merged commit 381dcdc into unstable May 11, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant