Skip to content

Refactor: streaming sse#26

Merged
SantiagoDePolonia merged 3 commits intomainfrom
refactor/streaming-sse
Dec 28, 2025
Merged

Refactor: streaming sse#26
SantiagoDePolonia merged 3 commits intomainfrom
refactor/streaming-sse

Conversation

@SantiagoDePolonia
Copy link
Copy Markdown
Contributor

@SantiagoDePolonia SantiagoDePolonia commented Dec 28, 2025

Summary by CodeRabbit

  • New Features

    • Added Groq and xAI provider support
    • Added observability metrics package
    • New shared OpenAI-compatible streaming response converter
  • Refactor

    • Consolidated streaming implementation across providers
    • Reorganized internal package layout and imports
  • Tests

    • Updated tests to simulate provider latency and adjust model-listing behavior
  • Documentation

    • Updated docs to reference new providers and package relabeling

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Dec 28, 2025

📝 Walkthrough

Walkthrough

Reorganizes internal packages (moves pkg/httpclient and pkg/llmclient to internal/...), introduces a shared OpenAIResponsesStreamConverter for provider streaming, replaces provider-specific converters with the shared converter, and updates tests and docs to match import and streaming-signature changes.

Changes

Cohort / File(s) Summary
Package import reorganization
internal/llmclient/client.go, internal/observability/metrics.go, internal/observability/metrics_test.go, internal/providers/anthropic/anthropic.go, internal/providers/factory.go, internal/providers/openai/openai.go, internal/providers/xai/xai.go
Updated imports from gomodel/internal/pkg/{httpclient,llmclient} to gomodel/internal/{httpclient,llmclient}; no API/behavioral changes.
Shared streaming converter
internal/providers/responses_converter.go
New OpenAIResponsesStreamConverter and NewOpenAIResponsesStreamConverter(reader, model) that convert OpenAI-style SSE/chat stream lines into Responses API events (response.created, response.output_text.delta, response.done) and implement io.ReadCloser.
Provider streaming refactor
internal/providers/gemini/gemini.go, internal/providers/groq/groq.go
Removed provider-local stream converters and replaced with calls to the shared providers.NewOpenAIResponsesStreamConverter; eliminated unused imports and local converter types.
Tests — call sites & timing
internal/providers/groq/groq_test.go, internal/providers/registry_cache_test.go, internal/providers/router_test.go
Updated tests to use NewOpenAIResponsesStreamConverter; added listModelsDelay to mock providers and changed ListModels to accept ctx context.Context (signature updated) to enable cancellable delays for timing assertions.
Documentation
CLAUDE.md
Doc updates to list new providers (Groq, xAI), note package reorganization, and reference the shared responses converter.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant ProviderHTTP as Provider HTTP Response (SSE)
  participant Converter as OpenAIResponsesStreamConverter
  participant Consumer as StreamResponses Caller

  Client->>ProviderHTTP: open streaming request (SSE / chat stream)
  ProviderHTTP-->>Converter: io.ReadCloser stream (lines / chunks)
  Converter->>Converter: emit `response.created` (initial)
  loop for each SSE line
    ProviderHTTP-->>Converter: line / chunk (json or [DONE])
    Converter->>Consumer: emit `response.output_text.delta` chunks
  end
  ProviderHTTP-->>Converter: EOF / [DONE]
  Converter->>Consumer: emit `response.done`
  Consumer->>Converter: Close()
  Converter->>ProviderHTTP: Close()
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

  • feat: added Groq provider #23 — replaces provider-specific streaming converters with a shared OpenAIResponsesStreamConverter; touches the same streaming conversion code paths.
  • Feature: added xAI provider #22 — adds/updates the xAI (Grok) provider implementation and registration; related to xai import and provider updates here.

Suggested labels

enhancement

Poem

🐰 Packages hopped and code did stream,
One converter now joins the team.
Groq and Gemini share the flow,
Tests tick-tock to match the show.
Hooray — the rabbit bops, tails aglow!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The PR title 'Refactor: streaming sse' is vague and generic, using non-descriptive terms that don't clearly convey what was refactored or why. Use a more descriptive title that explains the main refactoring objective, such as 'Refactor: consolidate streaming response converters' or 'Refactor: move llmclient package and extract shared response converter'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor/streaming-sse

📜 Recent review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 94b35d6 and 776a4f9.

📒 Files selected for processing (2)
  • internal/providers/registry_cache_test.go
  • internal/providers/router_test.go
🧰 Additional context used
📓 Path-based instructions (2)
internal/providers/**/*.go

📄 CodeRabbit inference engine (CLAUDE.md)

internal/providers/**/*.go: Provider implementations must implement the core.Provider interface with methods: ChatCompletion, StreamChatCompletion, ListModels, Responses, and StreamResponses
Streaming responses must return io.ReadCloser and the caller is responsible for closing the stream

Files:

  • internal/providers/router_test.go
  • internal/providers/registry_cache_test.go
internal/**/*_test.go

📄 CodeRabbit inference engine (CLAUDE.md)

Unit tests must be located alongside implementation files using the *_test.go naming convention

Files:

  • internal/providers/router_test.go
  • internal/providers/registry_cache_test.go
🧠 Learnings (8)
📓 Common learnings
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/**/*.go : Provider implementations must implement the `core.Provider` interface with methods: ChatCompletion, StreamChatCompletion, ListModels, Responses, and StreamResponses
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/**/*.go : Streaming responses must return `io.ReadCloser` and the caller is responsible for closing the stream
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/router.go : Use the ModelRegistry to determine which provider handles each model; return `ErrRegistryNotInitialized` if registry is used before models are loaded

Applied to files:

  • internal/providers/router_test.go
  • internal/providers/registry_cache_test.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/**/*.go : Provider implementations must implement the `core.Provider` interface with methods: ChatCompletion, StreamChatCompletion, ListModels, Responses, and StreamResponses

Applied to files:

  • internal/providers/router_test.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/registry.go : Registry must load models from cache synchronously at startup, then refresh asynchronously in the background every 5 minutes to enable non-blocking I/O and instant startup

Applied to files:

  • internal/providers/registry_cache_test.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/cache/**/*.go : Model registry must support both local file cache and Redis cache backends for instant startup and multi-instance deployments

Applied to files:

  • internal/providers/registry_cache_test.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/registry.go : Use thread-safe access with RWMutex in the ModelRegistry for concurrent access to model mappings

Applied to files:

  • internal/providers/registry_cache_test.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/cache/**/*.go : Cache structure must include Version, UpdatedAt timestamp, and Models map with modelID as key mapping to CachedModel info

Applied to files:

  • internal/providers/registry_cache_test.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/*/*.go : Providers must self-register with the factory using an `init()` function that calls `providers.RegisterFactory()` with the provider name

Applied to files:

  • internal/providers/registry_cache_test.go
🧬 Code graph analysis (2)
internal/providers/router_test.go (1)
internal/core/types.go (1)
  • ModelsResponse (63-66)
internal/providers/registry_cache_test.go (2)
internal/providers/registry.go (1)
  • NewModelRegistry (36-41)
internal/cache/local.go (1)
  • NewLocalCache (21-25)
🔇 Additional comments (2)
internal/providers/registry_cache_test.go (1)

284-310: LGTM! Good improvement to test reliability.

Adding an explicit listModelsDelay makes the timing behavior deterministic rather than relying on implicit race conditions. The 50ms delay ensures the cache check and assertion complete before the background network fetch returns, while the 100ms sleep at line 310 properly waits for cleanup.

internal/providers/router_test.go (1)

14-50: LGTM! Well-implemented mock with configurable delay and context cancellation.

The listModelsDelay field and the corresponding select block properly implement:

  1. Optional delay behavior for testing timing-sensitive scenarios
  2. Context cancellation support that returns ctx.Err() when cancelled during the delay
  3. Backward compatibility when delay is zero

This follows Go's idiomatic pattern for cancellable operations.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/providers/groq/groq.go (1)

214-228: LGTM! Excellent refactoring to use shared converter.

The StreamResponses method now delegates streaming conversion to the centralized providers.NewOpenAIResponsesStreamConverter. This eliminates code duplication and maintains the io.ReadCloser contract required by the core.Provider interface.

Based on coding guidelines: Streaming responses correctly return io.ReadCloser and the caller remains responsible for closing the stream.

📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d63cae8 and 94b35d6.

📒 Files selected for processing (15)
  • CLAUDE.md
  • internal/httpclient/client.go
  • internal/httpclient/client_test.go
  • internal/llmclient/client.go
  • internal/llmclient/client_test.go
  • internal/observability/metrics.go
  • internal/observability/metrics_test.go
  • internal/providers/anthropic/anthropic.go
  • internal/providers/factory.go
  • internal/providers/gemini/gemini.go
  • internal/providers/groq/groq.go
  • internal/providers/groq/groq_test.go
  • internal/providers/openai/openai.go
  • internal/providers/responses_converter.go
  • internal/providers/xai/xai.go
🧰 Additional context used
📓 Path-based instructions (3)
internal/providers/**/*.go

📄 CodeRabbit inference engine (CLAUDE.md)

internal/providers/**/*.go: Provider implementations must implement the core.Provider interface with methods: ChatCompletion, StreamChatCompletion, ListModels, Responses, and StreamResponses
Streaming responses must return io.ReadCloser and the caller is responsible for closing the stream

Files:

  • internal/providers/openai/openai.go
  • internal/providers/responses_converter.go
  • internal/providers/xai/xai.go
  • internal/providers/factory.go
  • internal/providers/groq/groq_test.go
  • internal/providers/groq/groq.go
  • internal/providers/anthropic/anthropic.go
  • internal/providers/gemini/gemini.go
internal/providers/*/*.go

📄 CodeRabbit inference engine (CLAUDE.md)

Providers must self-register with the factory using an init() function that calls providers.RegisterFactory() with the provider name

Files:

  • internal/providers/openai/openai.go
  • internal/providers/xai/xai.go
  • internal/providers/groq/groq_test.go
  • internal/providers/groq/groq.go
  • internal/providers/anthropic/anthropic.go
  • internal/providers/gemini/gemini.go
internal/**/*_test.go

📄 CodeRabbit inference engine (CLAUDE.md)

Unit tests must be located alongside implementation files using the *_test.go naming convention

Files:

  • internal/observability/metrics_test.go
  • internal/providers/groq/groq_test.go
🧠 Learnings (7)
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to cmd/gomodel/main.go : Provider packages must be imported in `cmd/gomodel/main.go` using blank imports (`_ "gomodel/internal/providers/{name}"`)

Applied to files:

  • internal/providers/openai/openai.go
  • internal/providers/xai/xai.go
  • internal/providers/factory.go
  • internal/observability/metrics_test.go
  • internal/providers/groq/groq_test.go
  • internal/providers/groq/groq.go
  • internal/providers/anthropic/anthropic.go
  • CLAUDE.md
  • internal/providers/gemini/gemini.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/**/*.go : Provider implementations must implement the `core.Provider` interface with methods: ChatCompletion, StreamChatCompletion, ListModels, Responses, and StreamResponses

Applied to files:

  • internal/providers/openai/openai.go
  • internal/providers/responses_converter.go
  • internal/providers/xai/xai.go
  • internal/providers/factory.go
  • internal/providers/groq/groq_test.go
  • internal/providers/groq/groq.go
  • internal/providers/anthropic/anthropic.go
  • CLAUDE.md
  • internal/providers/gemini/gemini.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/router.go : Use the ModelRegistry to determine which provider handles each model; return `ErrRegistryNotInitialized` if registry is used before models are loaded

Applied to files:

  • internal/providers/openai/openai.go
  • internal/providers/xai/xai.go
  • internal/providers/factory.go
  • internal/providers/groq/groq_test.go
  • internal/providers/groq/groq.go
  • internal/providers/anthropic/anthropic.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/registry.go : Use thread-safe access with RWMutex in the ModelRegistry for concurrent access to model mappings

Applied to files:

  • internal/providers/openai/openai.go
  • internal/providers/xai/xai.go
  • internal/providers/groq/groq_test.go
  • internal/providers/anthropic/anthropic.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/cache/**/*.go : Model registry must support both local file cache and Redis cache backends for instant startup and multi-instance deployments

Applied to files:

  • internal/providers/openai/openai.go
  • internal/providers/xai/xai.go
  • internal/providers/anthropic/anthropic.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/**/*.go : Streaming responses must return `io.ReadCloser` and the caller is responsible for closing the stream

Applied to files:

  • internal/providers/responses_converter.go
  • internal/providers/groq/groq_test.go
  • internal/providers/groq/groq.go
  • CLAUDE.md
  • internal/providers/gemini/gemini.go
📚 Learning: 2025-12-26T16:40:36.115Z
Learnt from: CR
Repo: ENTERPILOT/GOModel PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-12-26T16:40:36.115Z
Learning: Applies to internal/providers/*/*.go : Providers must self-register with the factory using an `init()` function that calls `providers.RegisterFactory()` with the provider name

Applied to files:

  • internal/providers/factory.go
  • internal/providers/groq/groq_test.go
  • CLAUDE.md
🧬 Code graph analysis (4)
internal/providers/responses_converter.go (2)
internal/providers/gemini/gemini.go (1)
  • New (38-48)
internal/providers/groq/groq.go (1)
  • New (33-40)
internal/providers/groq/groq_test.go (1)
internal/providers/responses_converter.go (1)
  • NewOpenAIResponsesStreamConverter (30-38)
internal/providers/groq/groq.go (1)
internal/providers/responses_converter.go (1)
  • NewOpenAIResponsesStreamConverter (30-38)
internal/providers/gemini/gemini.go (2)
internal/providers/responses_converter.go (1)
  • NewOpenAIResponsesStreamConverter (30-38)
internal/core/types.go (1)
  • Model (55-60)
🔇 Additional comments (14)
internal/providers/factory.go (1)

9-9: LGTM! Import path refactoring is correct.

The import path update from internal/pkg/llmclient to internal/llmclient aligns with the PR's objective to reorganize the internal package structure.

internal/llmclient/client.go (1)

21-21: LGTM! Import path refactoring is correct.

The import path update from internal/pkg/httpclient to internal/httpclient aligns with the PR's objective to reorganize the internal package structure.

internal/providers/anthropic/anthropic.go (1)

19-19: LGTM! Import path refactoring is correct.

The import path update from internal/pkg/llmclient to internal/llmclient aligns with the PR's objective to reorganize the internal package structure.

internal/providers/responses_converter.go (1)

197-198: Returning (0, nil) may cause busy-waiting in readers.

When no data is available yet, returning (0, nil) signals to the caller that the read succeeded with zero bytes. Some readers may interpret this as a signal to retry immediately, potentially causing a busy-wait loop. The standard practice is to block until data is available or return an error.

This is currently acceptable since the underlying sc.reader.Read() at line 79 will block, but if no complete lines are buffered, this could be reached.

Consider whether this edge case can occur and if so, whether blocking or a small sleep would be more appropriate.

internal/providers/gemini/gemini.go (2)

14-14: LGTM! Import path refactoring is correct.

The import path update from internal/pkg/llmclient to internal/llmclient aligns with the PR's objective to reorganize the internal package structure.


310-310: Excellent refactoring to use shared converter.

Replacing the local geminiResponsesStreamConverter with the shared providers.NewOpenAIResponsesStreamConverter eliminates code duplication and centralizes the OpenAI-to-Responses API conversion logic. This improves maintainability.

internal/providers/xai/xai.go (1)

10-10: LGTM! Import path refactoring is correct.

The import path update from internal/pkg/llmclient to internal/llmclient aligns with the PR's objective to reorganize the internal package structure.

internal/observability/metrics.go (1)

12-12: LGTM! Import path refactoring is correct.

The import path update from internal/pkg/llmclient to internal/llmclient aligns with the PR's objective to reorganize the internal package structure.

internal/providers/openai/openai.go (1)

10-10: LGTM! Import path refactoring is correct.

The import path update from internal/pkg/llmclient to internal/llmclient aligns with the PR's objective to reorganize the internal package structure.

CLAUDE.md (1)

161-173: LGTM! Documentation accurately reflects the refactoring.

The documentation updates correctly capture the package structure changes, including the new providers (Groq and xAI), the shared responses converter, relocated internal packages, and the observability module.

internal/observability/metrics_test.go (1)

13-13: LGTM! Import path correctly updated.

The import path change from gomodel/internal/pkg/llmclient to gomodel/internal/llmclient aligns with the package relocation objective of this PR.

internal/providers/groq/groq.go (1)

13-13: LGTM! Import path correctly updated.

The import path change from gomodel/internal/pkg/llmclient to gomodel/internal/llmclient is consistent with the package relocation objectives of this PR.

internal/providers/groq/groq_test.go (2)

13-13: LGTM! Import added for shared converter access.

The import of gomodel/internal/providers is necessary to access the public NewOpenAIResponsesStreamConverter function used throughout the tests.


880-970: LGTM! Tests properly validate shared converter behavior.

The test suite successfully migrated from the internal converter to the public providers.NewOpenAIResponsesStreamConverter API. Tests comprehensively verify:

  • Stream conversion with proper event formatting (response.created, response.output_text.delta, response.done)
  • Correct Close() behavior and subsequent EOF handling
  • Empty delta filtering to avoid emitting unnecessary events

The tests confirm that the shared converter correctly handles Groq's OpenAI-compatible streaming format.

Comment on lines +65 to +69
jsonData, err := json.Marshal(createdEvent)
if err != nil {
slog.Error("failed to marshal response.created event", "error", err, "response_id", sc.responseID)
return 0, nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential data loss on marshaling error.

When JSON marshaling fails for the response.created event, the function returns (0, nil), which silently discards the error. This could lead to incomplete streams where the initial event is missing. Consider returning the error to the caller or logging it at a higher severity level.

🔎 Proposed fix
 jsonData, err := json.Marshal(createdEvent)
 if err != nil {
 	slog.Error("failed to marshal response.created event", "error", err, "response_id", sc.responseID)
-	return 0, nil
+	return 0, fmt.Errorf("failed to marshal response.created event: %w", err)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
jsonData, err := json.Marshal(createdEvent)
if err != nil {
slog.Error("failed to marshal response.created event", "error", err, "response_id", sc.responseID)
return 0, nil
}
jsonData, err := json.Marshal(createdEvent)
if err != nil {
slog.Error("failed to marshal response.created event", "error", err, "response_id", sc.responseID)
return 0, fmt.Errorf("failed to marshal response.created event: %w", err)
}
🤖 Prompt for AI Agents
In internal/providers/responses_converter.go around lines 65 to 69, the code
swallows a JSON marshal error by returning (0, nil) which can drop the initial
response.created event; propagate the error instead of returning nil by
returning (0, err) (or wrap with contextual message) so the caller can
handle/fail the stream, and ensure the log remains or is elevated to reflect a
serious marshaling failure.

Comment on lines +125 to +150
// Parse the chat completion chunk
var chunk map[string]interface{}
if err := json.Unmarshal(data, &chunk); err != nil {
continue
}

// Extract content delta
if choices, ok := chunk["choices"].([]interface{}); ok && len(choices) > 0 {
if choice, ok := choices[0].(map[string]interface{}); ok {
if delta, ok := choice["delta"].(map[string]interface{}); ok {
if content, ok := delta["content"].(string); ok && content != "" {
deltaEvent := map[string]interface{}{
"type": "response.output_text.delta",
"delta": content,
}
jsonData, err := json.Marshal(deltaEvent)
if err != nil {
slog.Error("failed to marshal content delta event", "error", err, "response_id", sc.responseID)
continue
}
sc.buffer = append(sc.buffer, []byte(fmt.Sprintf("event: response.output_text.delta\ndata: %s\n\n", jsonData))...)
}
}
}
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Silent error handling for malformed JSON chunks.

When JSON unmarshaling fails for chat completion chunks (line 127-129), the error is silently ignored with continue. While this provides resilience against malformed data, it may hide issues during development or when providers change their response formats.

Consider adding debug-level logging for unmarshal failures to aid troubleshooting.

🔎 Proposed enhancement
 var chunk map[string]interface{}
 if err := json.Unmarshal(data, &chunk); err != nil {
+	slog.Debug("failed to unmarshal chat completion chunk", "error", err, "data", string(data), "response_id", sc.responseID)
 	continue
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Parse the chat completion chunk
var chunk map[string]interface{}
if err := json.Unmarshal(data, &chunk); err != nil {
continue
}
// Extract content delta
if choices, ok := chunk["choices"].([]interface{}); ok && len(choices) > 0 {
if choice, ok := choices[0].(map[string]interface{}); ok {
if delta, ok := choice["delta"].(map[string]interface{}); ok {
if content, ok := delta["content"].(string); ok && content != "" {
deltaEvent := map[string]interface{}{
"type": "response.output_text.delta",
"delta": content,
}
jsonData, err := json.Marshal(deltaEvent)
if err != nil {
slog.Error("failed to marshal content delta event", "error", err, "response_id", sc.responseID)
continue
}
sc.buffer = append(sc.buffer, []byte(fmt.Sprintf("event: response.output_text.delta\ndata: %s\n\n", jsonData))...)
}
}
}
}
}
// Parse the chat completion chunk
var chunk map[string]interface{}
if err := json.Unmarshal(data, &chunk); err != nil {
slog.Debug("failed to unmarshal chat completion chunk", "error", err, "data", string(data), "response_id", sc.responseID)
continue
}
// Extract content delta
if choices, ok := chunk["choices"].([]interface{}); ok && len(choices) > 0 {
if choice, ok := choices[0].(map[string]interface{}); ok {
if delta, ok := choice["delta"].(map[string]interface{}); ok {
if content, ok := delta["content"].(string); ok && content != "" {
deltaEvent := map[string]interface{}{
"type": "response.output_text.delta",
"delta": content,
}
jsonData, err := json.Marshal(deltaEvent)
if err != nil {
slog.Error("failed to marshal content delta event", "error", err, "response_id", sc.responseID)
continue
}
sc.buffer = append(sc.buffer, []byte(fmt.Sprintf("event: response.output_text.delta\ndata: %s\n\n", jsonData))...)
}
}
}
}
🤖 Prompt for AI Agents
internal/providers/responses_converter.go around lines 125 to 150: the
json.Unmarshal error for chat completion chunks is currently ignored which hides
malformed input; update the error handling to log a debug-level message
including the unmarshaling error and a truncated preview of the raw data (or its
length) before continuing, preserving the current continue behavior so
resilience remains but developers can trace malformed or unexpected provider
responses.

@SantiagoDePolonia SantiagoDePolonia self-assigned this Dec 28, 2025
@SantiagoDePolonia SantiagoDePolonia merged commit bbce878 into main Dec 28, 2025
7 checks passed
This was referenced Jan 15, 2026
@SantiagoDePolonia SantiagoDePolonia deleted the refactor/streaming-sse branch March 22, 2026 14:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant