Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces database index updates for request executions and enhances error handling within the OpenAI response streaming logic. Key changes include the addition of a mechanism to classify and emit stream-level errors—such as timeouts, upstream EOF, and incomplete streams—and logic to prevent redundant error reporting. Review feedback identified a bug in the inbound stream's Next method where enqueued error events could be lost before being processed, and suggested refining the extraction of error messages from raw HTTP response bodies to improve the client experience.
| // Source stream ended - check if we need to emit an error event | ||
| if s.err == nil && s.source.Err() != nil { | ||
| sourceErr := s.source.Err() | ||
| // Don't emit error event for client cancellation | ||
| if errors.Is(sourceErr, context.Canceled) { | ||
| slog.DebugContext(s.ctx, "stream canceled by client") | ||
| return false | ||
| } | ||
| if errors.Is(sourceErr, context.DeadlineExceeded) { | ||
| slog.DebugContext(s.ctx, "stream deadline exceeded") | ||
| return false | ||
| } | ||
| // Emit an error event for upstream failures | ||
| s.emitStreamErrorEvent(sourceErr) | ||
| } |
There was a problem hiding this comment.
The current implementation of Next() returns false immediately after calling emitStreamErrorEvent, which causes the enqueued error event to be lost because the caller will stop iterating the stream. Additionally, it should handle the case where the source stream ends prematurely without an error (incomplete stream), similar to the logic in outbound_stream.go. We also need to check s.errorEventEmitted to prevent multiple error events if Next() is called again after an error has been emitted.
// Source stream ended - check if we need to emit an error event
if s.err == nil && !s.errorEventEmitted {
sourceErr := s.source.Err()
if sourceErr != nil {
// Don't emit error event for client cancellation
if errors.Is(sourceErr, context.Canceled) {
slog.DebugContext(s.ctx, "stream canceled by client")
return false
}
if errors.Is(sourceErr, context.DeadlineExceeded) {
slog.DebugContext(s.ctx, "stream deadline exceeded")
return false
}
// Emit an error event for upstream failures
s.emitStreamErrorEvent(sourceErr)
return true
} else if !s.responseCompleted && s.responseID != "" {
// Stream ended without terminal event
s.emitStreamErrorEvent(ErrStreamIncomplete)
return true
}
}| var httpErr *httpclient.Error | ||
| if errors.As(err, &httpErr) { | ||
| code = "api_error" | ||
| message = string(httpErr.Body) |
There was a problem hiding this comment.
When handling an httpclient.Error, the message is set to the raw response body. If the upstream provider returns a JSON error (e.g., OpenAI's error format), this will result in a raw JSON string being sent as the error message to the client. Consider extracting the actual error message from the JSON body if it exists to provide a cleaner experience for the end user.
Greptile SummaryThis PR fixes upstream stream error handling for the OpenAI Responses API transformer: Confidence Score: 5/5Safe to merge — error handling is correct, previously flagged bugs are resolved, and the schema indexes are additive. All findings are P2 (one minor suggestion about preserving the provider error code through the chained transformer path). The core error-emission logic is correct: emitStreamErrorEvent properly returns errors, recursive Next() delivers queued events, and ErrStreamIncomplete is only set when appropriate. No P0 or P1 issues found. No files require special attention; the P2 comment on outbound_stream.go is a non-blocking improvement suggestion. Important Files Changed
Sequence DiagramsequenceDiagram
participant Provider as Provider (Responses API SSE)
participant OS as outbound_stream
participant LLM as llm.Response stream
participant IS as inbound_stream
participant Client as Client (SSE)
Provider->>OS: SSE events (response.created, output_text.delta…)
OS->>LLM: llm.Response chunks
alt Stream ends normally
Provider->>OS: response.completed
OS->>OS: responseCompleted = true
OS->>LLM: llm.Response (finish_reason + usage)
LLM->>IS: chunks
IS->>Client: response.completed SSE event
end
alt Provider sends error event
Provider->>OS: error (StreamEventTypeError)
OS->>OS: s.err = &llm.ResponseError{…}
OS-->>LLM: Next() = false, Err() = ResponseError
LLM->>IS: source.Err() != nil
IS->>IS: emitStreamErrorEvent → classifyStreamError
IS->>Client: response.failed or error SSE event
IS->>Client: Err() = nil (errorEventEmitted=true)
end
alt Upstream EOF / incomplete stream
Provider->>OS: connection closed (no terminal event)
OS->>OS: s.err = ErrStreamIncomplete (if responseID set)
OS-->>LLM: Next() = false, Err() = ErrStreamIncomplete
LLM->>IS: source.Err() = ErrStreamIncomplete
IS->>IS: classifyStreamError → code=incomplete_stream
IS->>Client: response.failed SSE event
end
Reviews (2): Last reviewed commit: "opt: request executions index, close #14..." | Re-trigger Greptile |
No description provided.