Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
There was a problem hiding this comment.
Code Review
This pull request expands the 'Pass-Through' functionality to support both request and response/stream data when client and provider API formats are identical. It introduces several new middlewares to capture and apply raw provider data while ensuring that tracking and performance recording still occur. Feedback focuses on critical issues in the stream fan-out implementation, including potential data loss due to non-blocking channel sends, race conditions in error propagation, and the need for context cancellation checks in request-scoped goroutines to prevent leaks.
| go func() { | ||
| defer func() { | ||
| outbound.state.RawStreamErr = stream.Err() | ||
| stream.Close() | ||
| }() | ||
| defer close(pipelineCh) | ||
| defer close(outbound.state.RawStreamCh) | ||
|
|
||
| for stream.Next() { | ||
| event := stream.Current() | ||
| select { | ||
| case pipelineCh <- event: | ||
| default: | ||
| log.Debug(ctx, "dropping pipeline event, buffer full", | ||
| log.String("channel", channel.Name)) | ||
| } | ||
|
|
||
| select { | ||
| case outbound.state.RawStreamCh <- event: | ||
| default: | ||
| log.Debug(ctx, "dropping pass-through event, buffer full", | ||
| log.String("channel", channel.Name)) | ||
| } | ||
| } | ||
| }() |
There was a problem hiding this comment.
The fan-out logic in captureRawProviderStream has several critical issues:
- Data Loss & Inconsistency: Using
selectwith adefaultcase causes events to be silently dropped if either the pipeline or the client is slow. This results in corrupted streams for the client and incorrect usage tracking for the pipeline. It should block instead to apply backpressure. - Race Condition: The current
deferordering closes the channels beforeRawStreamErris populated. Consumers callingErr()immediately after the stream ends (whenNext()returns false) may receive anilerror instead of the actual provider error. - Context Cancellation: The goroutine does not check for
ctx.Done(), which can lead to it hanging if a channel send blocks while the request is aborted. Note that while long-lived application-level goroutines may omit cancellation, request-scoped goroutines like this one must handle it.
go func() {
defer stream.Close()
defer close(outbound.state.RawStreamCh)
defer close(pipelineCh)
defer func() {
outbound.state.RawStreamErr = stream.Err()
}()
for stream.Next() {
event := stream.Current()
select {
case pipelineCh <- event:
case <-ctx.Done():
return
}
select {
case outbound.state.RawStreamCh <- event:
case <-ctx.Done():
return
}
}
}()References
- Goroutines intended to run for the application's entire lifetime do not require explicit stop channels or context cancellation for graceful shutdown, but transient or request-scoped goroutines still require them to prevent leaks.
| func (s *passThroughChannelStream) Next() bool { | ||
| ev, ok := <-s.ch | ||
| if !ok { | ||
| return false | ||
| } | ||
|
|
||
| s.current = ev | ||
|
|
||
| return true | ||
| } |
There was a problem hiding this comment.
The Next() method performs a blocking receive on the channel without considering context cancellation. This can lead to goroutine leaks if the producer is stuck and the request context is cancelled. The passThroughChannelStream should ideally hold a reference to the request context and check ctx.Done() in its Next() method.
References
- Goroutines intended to run for the application's entire lifetime do not require explicit stop channels or context cancellation for graceful shutdown, but transient or request-scoped goroutines still require them to prevent leaks.
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
Uh oh!
There was an error while loading. Please reload this page.