feat: support non-stream fallback to require-stream channels#1661
Conversation
Allow require-stream channels to serve non-stream requests by upgrading provider calls to streaming and aggregating the response server-side. Keep retry and pass-through behavior aligned so clients still receive a stable non-stream contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to support non-streaming requests on channels that require streaming by automatically upgrading the request to a stream and aggregating the resulting chunks. Key changes include refactoring candidate selection based on stream policies, implementing an autoAggregateStream method in the pipeline, and tracking the original stream intent to maintain correct pass-through and retry behavior. Feedback suggests that passThroughStreamAligned should treat nil and false as equivalent for stream parameters to prevent unnecessary pass-through disabling. Additionally, it was noted that the current autoAggregateStream implementation bypasses OutboundLlmResponse middlewares, which may lead to skipping critical processing steps like PII redaction or validation.
| func passThroughStreamAligned(originalStream, effectiveStream *bool) bool { | ||
| if originalStream == nil && effectiveStream == nil { | ||
| return true | ||
| } | ||
| if originalStream == nil || effectiveStream == nil { | ||
| return false | ||
| } | ||
|
|
||
| return *originalStream == *effectiveStream | ||
| } |
There was a problem hiding this comment.
The passThroughStreamAligned function treats nil and false as different values. In the context of LLM requests, a nil stream parameter is semantically equivalent to false. If an outbound transformer explicitly sets stream: false in the request body (which some do for explicitness) while the original request had it as nil, pass-through will be unnecessarily disabled. Consider treating nil and false as equivalent.
func passThroughStreamAligned(originalStream, effectiveStream *bool) bool {
isTrue := func(b *bool) bool {
return b != nil && *b
}
return isTrue(originalStream) == isTrue(effectiveStream)
}| return nil, err | ||
| } | ||
|
|
||
| body, _, err := p.Inbound.AggregateStreamChunks(ctx, chunks) |
There was a problem hiding this comment.
The autoAggregateStream method bypasses the structured llm.Response phase by calling Inbound.AggregateStreamChunks directly to get the final body. This causes all OutboundLlmResponse middlewares (applied via applyLlmResponseMiddlewares) to be skipped for auto-aggregated responses. If there are middlewares performing critical tasks like content filtering, PII redaction, or structured validation on the llm.Response object, they will not run in this fallback path.
There was a problem hiding this comment.
Checked this path carefully.
The auto-aggregate fallback does not skip the whole middleware chain. It still runs the full streaming path through p.stream(...), which means the stream-phase hooks are already applied before aggregation:
- raw stream middlewares
Outbound.TransformStream(...)OnOutboundLlmStream(...)Inbound.TransformStream(...)- inbound raw stream middlewares
What it does not do is re-enter the separate non-stream OnOutboundLlmResponse(...) phase after the stream has already been transformed and consumed.
So this is not a full middleware bypass; it is specifically a response-phase gap after a path that already executed the stream-phase hooks.
Given the current architecture, I am leaving that out of this PR:
- stream-aware middleware already has
OnOutboundLlmStream(...) - auto-aggregate is implemented by draining the transformed stream and synthesizing the final HTTP response
- re-entering
OnOutboundLlmResponse(...)would need a separate design for how to build a final*llm.Responsefrom the drained stream path and how to avoid double-processing middleware that already handled the streaming phase
For this PR, the supported behavior is that the fallback path executes the streaming middleware phase, while any "also run response middleware after aggregation" change is treated as a separate follow-up design/task.
Greptile SummaryThis PR adds a non-stream fallback path so that clients making non-stream requests can be served by channels configured with
Confidence Score: 5/5Safe to merge — the auto-aggregation path is well-isolated, both new error types are wired into retry, and pass-through is correctly disabled when stream semantics change. All three concerns from prior review rounds are resolved in this diff: the forced provider call now explicitly sets stream_options.include_usage=true in TransformRequest, the two new empty-stream errors are added to CanRetry, and the originalStream snapshot before the retry loop prevents forced-stream state from bleeding into retries. No files require special attention. Test files cover the critical paths well. Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Non-stream client request] --> B{Candidate selection}
B --> C{Any native non-stream candidates?}
C -->|Yes| D[Return native candidates — normal non-stream path]
C -->|No - all require-stream| E{supportsAutoAggregateRequest?}
E -->|No - embedding/compact/AI SDK| F[Return nil — no candidates]
E -->|Yes - chat/completion formats| G[Return require-stream candidates]
G --> H[TransformRequest forces stream=true, IncludeUsage=true]
H --> I[processRequest: originalWantStream=false, effectiveWantStream=true]
I --> K[autoAggregateStream]
K --> L[p.stream — full middleware pipeline]
L --> M[Collect inbound SSE chunks]
M --> N{len chunks == 0?}
N -->|Yes| O[ErrEmptyStreamChunks — retriable]
N -->|No| P[Inbound.AggregateStreamChunks]
P --> Q{len body == 0?}
Q -->|Yes| R[ErrEmptyAggregatedBody — retriable]
Q -->|No| S[Return synthetic non-stream Response]
S --> T[Client receives normal non-stream response]
D --> U[Normal notStream path]
U --> T
Reviews (5): Last reviewed commit: "Merge remote-tracking branch 'origin/uns..." | Re-trigger Greptile |
Treat empty auto-aggregated stream results the same as other empty responses so same-channel retry still applies. Also format the affected tests to satisfy CI lint checks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Ensure auto-aggregated require-stream fallbacks also enable stream usage collection after the provider request is upgraded to streaming. Refresh the related regression tests and formatting so CI lint stays clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
#922 如果只配了流式供应商,许多非流式的任务会失败 |
Treat nil and false stream values as equivalent when deciding whether request semantics still match for pass-through eligibility. Add a regression test covering the nil-to-false alignment case. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
我觉得这是一个好想法,但是不知道有没有更好的实现方式。 |
…eam-aggregation # Conflicts: # llm/pipeline/pipeline.go
|
目前我也没啥好想法,这个也是一个挺边缘的case了,大部分还是都用流式请求,目前我这边的场景是主要是block小型测试 |
…1661) * feat: auto aggregate require-stream responses Allow require-stream channels to serve non-stream requests by upgrading provider calls to streaming and aggregating the response server-side. Keep retry and pass-through behavior aligned so clients still receive a stable non-stream contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: retry empty auto-aggregated stream responses Treat empty auto-aggregated stream results the same as other empty responses so same-channel retry still applies. Also format the affected tests to satisfy CI lint checks. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: preserve usage metadata for forced streaming requests Ensure auto-aggregated require-stream fallbacks also enable stream usage collection after the provider request is upgraded to streaming. Refresh the related regression tests and formatting so CI lint stays clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: align pass-through stream nil and false semantics Treat nil and false stream values as equivalent when deciding whether request semantics still match for pass-through eligibility. Add a regression test covering the nil-to-false alignment case. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
stream=requirechannels when the request type and API format support server-side auto-aggregationImpact
stream=requirechannels can now be served to supported non-stream clients instead of being filtered out during candidate selectionstream=requirechannels are available, candidate selection still prefers the native non-stream path firstTest plan
go test ./internal/server/orchestratorcd llm && go test ./pipeline🤖 Generated with Claude Code