Skip to content

opt: responses stream failed handle & request executions index, close #1473#1585

Merged
looplj merged 2 commits into
unstablefrom
dev-tmp
May 2, 2026
Merged

opt: responses stream failed handle & request executions index, close #1473#1585
looplj merged 2 commits into
unstablefrom
dev-tmp

Conversation

@looplj

@looplj looplj commented May 2, 2026

Copy link
Copy Markdown
Owner

No description provided.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces database index updates for request executions and enhances error handling within the OpenAI response streaming logic. Key changes include the addition of a mechanism to classify and emit stream-level errors—such as timeouts, upstream EOF, and incomplete streams—and logic to prevent redundant error reporting. Review feedback identified a bug in the inbound stream's Next method where enqueued error events could be lost before being processed, and suggested refining the extraction of error messages from raw HTTP response bodies to improve the client experience.

Comment on lines +123 to +137
// Source stream ended - check if we need to emit an error event
if s.err == nil && s.source.Err() != nil {
sourceErr := s.source.Err()
// Don't emit error event for client cancellation
if errors.Is(sourceErr, context.Canceled) {
slog.DebugContext(s.ctx, "stream canceled by client")
return false
}
if errors.Is(sourceErr, context.DeadlineExceeded) {
slog.DebugContext(s.ctx, "stream deadline exceeded")
return false
}
// Emit an error event for upstream failures
s.emitStreamErrorEvent(sourceErr)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of Next() returns false immediately after calling emitStreamErrorEvent, which causes the enqueued error event to be lost because the caller will stop iterating the stream. Additionally, it should handle the case where the source stream ends prematurely without an error (incomplete stream), similar to the logic in outbound_stream.go. We also need to check s.errorEventEmitted to prevent multiple error events if Next() is called again after an error has been emitted.

		// Source stream ended - check if we need to emit an error event
		if s.err == nil && !s.errorEventEmitted {
			sourceErr := s.source.Err()
			if sourceErr != nil {
				// Don't emit error event for client cancellation
				if errors.Is(sourceErr, context.Canceled) {
					slog.DebugContext(s.ctx, "stream canceled by client")
					return false
				}
				if errors.Is(sourceErr, context.DeadlineExceeded) {
					slog.DebugContext(s.ctx, "stream deadline exceeded")
					return false
				}
				// Emit an error event for upstream failures
				s.emitStreamErrorEvent(sourceErr)
				return true
			} else if !s.responseCompleted && s.responseID != "" {
				// Stream ended without terminal event
				s.emitStreamErrorEvent(ErrStreamIncomplete)
				return true
			}
		}

var httpErr *httpclient.Error
if errors.As(err, &httpErr) {
code = "api_error"
message = string(httpErr.Body)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When handling an httpclient.Error, the message is set to the raw response body. If the upstream provider returns a JSON error (e.g., OpenAI's error format), this will result in a raw JSON string being sent as the error message to the client. Consider extracting the actual error message from the JSON body if it exists to provide a cleaner experience for the end user.

@greptile-apps

greptile-apps Bot commented May 2, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR fixes upstream stream error handling for the OpenAI Responses API transformer: inbound_stream now emits response.failed or error SSE events when the source stream errors out (rather than silently swallowing it), and outbound_stream sets ErrStreamIncomplete when the provider stream ends without a terminal event. Three composite database indexes are also added to request_executions for window-function and ordering queries.

Confidence Score: 5/5

Safe to merge — error handling is correct, previously flagged bugs are resolved, and the schema indexes are additive.

All findings are P2 (one minor suggestion about preserving the provider error code through the chained transformer path). The core error-emission logic is correct: emitStreamErrorEvent properly returns errors, recursive Next() delivers queued events, and ErrStreamIncomplete is only set when appropriate. No P0 or P1 issues found.

No files require special attention; the P2 comment on outbound_stream.go is a non-blocking improvement suggestion.

Important Files Changed

Filename Overview
llm/transformer/openai/responses/inbound_stream.go Major addition of upstream error event emission — emitStreamErrorEvent, classifyStreamError, and recursive Next() call to deliver queued error events; previous thread issues addressed.
llm/transformer/openai/responses/outbound_stream.go Adds ErrStreamIncomplete detection when stream ends without terminal event; adds StreamEventTypeError → llm.ResponseError translation; responseCompleted flag drives incomplete-stream logic correctly.
llm/transformer/openai/responses/inbound_stream_test.go New TestInboundTransformer_TransformStream_EmitsUpstreamErrorEvents covers both "error before response starts" and "response.failed after response starts" paths.
internal/ent/schema/request_execution.go Adds three composite indexes on request_executions for window-function and ordering queries; storage key names are descriptive and match the indexed fields.
llm/transformer/openai/responses/model.go No functional changes — file provides shared model types used by both stream transformers.

Sequence Diagram

sequenceDiagram
    participant Provider as Provider (Responses API SSE)
    participant OS as outbound_stream
    participant LLM as llm.Response stream
    participant IS as inbound_stream
    participant Client as Client (SSE)

    Provider->>OS: SSE events (response.created, output_text.delta…)
    OS->>LLM: llm.Response chunks

    alt Stream ends normally
        Provider->>OS: response.completed
        OS->>OS: responseCompleted = true
        OS->>LLM: llm.Response (finish_reason + usage)
        LLM->>IS: chunks
        IS->>Client: response.completed SSE event
    end

    alt Provider sends error event
        Provider->>OS: error (StreamEventTypeError)
        OS->>OS: s.err = &llm.ResponseError{…}
        OS-->>LLM: Next() = false, Err() = ResponseError
        LLM->>IS: source.Err() != nil
        IS->>IS: emitStreamErrorEvent → classifyStreamError
        IS->>Client: response.failed or error SSE event
        IS->>Client: Err() = nil (errorEventEmitted=true)
    end

    alt Upstream EOF / incomplete stream
        Provider->>OS: connection closed (no terminal event)
        OS->>OS: s.err = ErrStreamIncomplete (if responseID set)
        OS-->>LLM: Next() = false, Err() = ErrStreamIncomplete
        LLM->>IS: source.Err() = ErrStreamIncomplete
        IS->>IS: classifyStreamError → code=incomplete_stream
        IS->>Client: response.failed SSE event
    end
Loading

Reviews (2): Last reviewed commit: "opt: request executions index, close #14..." | Re-trigger Greptile

Comment thread llm/transformer/openai/responses/inbound_stream.go Outdated
Comment thread llm/transformer/openai/responses/inbound_stream.go
@looplj looplj merged commit d36572f into unstable May 2, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant