Skip to content

fix: should pass through response/stream, close #1442#1495

Merged
looplj merged 1 commit into
unstablefrom
dev-tmp
Apr 26, 2026
Merged

fix: should pass through response/stream, close #1442#1495
looplj merged 1 commit into
unstablefrom
dev-tmp

Conversation

@looplj

@looplj looplj commented Apr 26, 2026

Copy link
Copy Markdown
Owner

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request expands the 'Pass-Through' functionality to support both request and response/stream data when client and provider API formats are identical. It introduces several new middlewares to capture and apply raw provider data while ensuring that tracking and performance recording still occur. Feedback focuses on critical issues in the stream fan-out implementation, including potential data loss due to non-blocking channel sends, race conditions in error propagation, and the need for context cancellation checks in request-scoped goroutines to prevent leaks.

Comment on lines +205 to +229
go func() {
defer func() {
outbound.state.RawStreamErr = stream.Err()
stream.Close()
}()
defer close(pipelineCh)
defer close(outbound.state.RawStreamCh)

for stream.Next() {
event := stream.Current()
select {
case pipelineCh <- event:
default:
log.Debug(ctx, "dropping pipeline event, buffer full",
log.String("channel", channel.Name))
}

select {
case outbound.state.RawStreamCh <- event:
default:
log.Debug(ctx, "dropping pass-through event, buffer full",
log.String("channel", channel.Name))
}
}
}()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The fan-out logic in captureRawProviderStream has several critical issues:

  1. Data Loss & Inconsistency: Using select with a default case causes events to be silently dropped if either the pipeline or the client is slow. This results in corrupted streams for the client and incorrect usage tracking for the pipeline. It should block instead to apply backpressure.
  2. Race Condition: The current defer ordering closes the channels before RawStreamErr is populated. Consumers calling Err() immediately after the stream ends (when Next() returns false) may receive a nil error instead of the actual provider error.
  3. Context Cancellation: The goroutine does not check for ctx.Done(), which can lead to it hanging if a channel send blocks while the request is aborted. Note that while long-lived application-level goroutines may omit cancellation, request-scoped goroutines like this one must handle it.
		go func() {
			defer stream.Close()
			defer close(outbound.state.RawStreamCh)
			defer close(pipelineCh)
			defer func() {
				outbound.state.RawStreamErr = stream.Err()
			}()

			for stream.Next() {
				event := stream.Current()
				select {
				case pipelineCh <- event:
				case <-ctx.Done():
					return
				}

				select {
				case outbound.state.RawStreamCh <- event:
				case <-ctx.Done():
					return
				}
			}
		}()
References
  1. Goroutines intended to run for the application's entire lifetime do not require explicit stop channels or context cancellation for graceful shutdown, but transient or request-scoped goroutines still require them to prevent leaks.

Comment on lines +274 to +283
func (s *passThroughChannelStream) Next() bool {
ev, ok := <-s.ch
if !ok {
return false
}

s.current = ev

return true
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Next() method performs a blocking receive on the channel without considering context cancellation. This can lead to goroutine leaks if the producer is stuck and the request context is cancelled. The passThroughChannelStream should ideally hold a reference to the request context and check ctx.Done() in its Next() method.

References
  1. Goroutines intended to run for the application's entire lifetime do not require explicit stop channels or context cancellation for graceful shutdown, but transient or request-scoped goroutines still require them to prevent leaks.

devin-ai-integration[bot]

This comment was marked as resolved.

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

devin-ai-integration[bot]

This comment was marked as resolved.

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@looplj looplj merged commit 2d8f8a8 into unstable Apr 26, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant