Skip to content

feat: support non-stream fallback to require-stream channels#1661

Merged
looplj merged 5 commits into
looplj:unstablefrom
ttttmr:worktree-auto-stream-aggregation
May 25, 2026
Merged

feat: support non-stream fallback to require-stream channels#1661
looplj merged 5 commits into
looplj:unstablefrom
ttttmr:worktree-auto-stream-aggregation

Conversation

@ttttmr

@ttttmr ttttmr commented May 14, 2026

Copy link
Copy Markdown
Contributor

Summary

  • adjust channel selection for non-stream requests: prefer channels that can answer natively, but fall back to stream=require channels when the request type and API format support server-side auto-aggregation
  • keep the client-facing contract unchanged by upgrading only the selected provider call to streaming and aggregating the streamed result back into a normal non-stream response
  • disable pass-through when request stream semantics change during auto-upgrade, so the original request body or raw provider response cannot override the upgraded transport behavior
  • preserve retry behavior for the upgraded flow and add regression coverage for empty-stream, empty-aggregate, and pass-through edge cases

Impact

  • models behind stream=require channels can now be served to supported non-stream clients instead of being filtered out during candidate selection
  • when both native non-stream and stream=require channels are available, candidate selection still prefers the native non-stream path first
  • unsupported request types and formats keep the previous behavior and will not fall back to auto-aggregation

Test plan

  • go test ./internal/server/orchestrator
  • cd llm && go test ./pipeline

🤖 Generated with Claude Code

Allow require-stream channels to serve non-stream requests by upgrading provider calls to streaming and aggregating the response server-side. Keep retry and pass-through behavior aligned so clients still receive a stable non-stream contract.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to support non-streaming requests on channels that require streaming by automatically upgrading the request to a stream and aggregating the resulting chunks. Key changes include refactoring candidate selection based on stream policies, implementing an autoAggregateStream method in the pipeline, and tracking the original stream intent to maintain correct pass-through and retry behavior. Feedback suggests that passThroughStreamAligned should treat nil and false as equivalent for stream parameters to prevent unnecessary pass-through disabling. Additionally, it was noted that the current autoAggregateStream implementation bypasses OutboundLlmResponse middlewares, which may lead to skipping critical processing steps like PII redaction or validation.

Comment on lines +64 to +73
func passThroughStreamAligned(originalStream, effectiveStream *bool) bool {
if originalStream == nil && effectiveStream == nil {
return true
}
if originalStream == nil || effectiveStream == nil {
return false
}

return *originalStream == *effectiveStream
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The passThroughStreamAligned function treats nil and false as different values. In the context of LLM requests, a nil stream parameter is semantically equivalent to false. If an outbound transformer explicitly sets stream: false in the request body (which some do for explicitness) while the original request had it as nil, pass-through will be unnecessarily disabled. Consider treating nil and false as equivalent.

func passThroughStreamAligned(originalStream, effectiveStream *bool) bool {
	isTrue := func(b *bool) bool {
		return b != nil && *b
	}
	return isTrue(originalStream) == isTrue(effectiveStream)
}

return nil, err
}

body, _, err := p.Inbound.AggregateStreamChunks(ctx, chunks)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The autoAggregateStream method bypasses the structured llm.Response phase by calling Inbound.AggregateStreamChunks directly to get the final body. This causes all OutboundLlmResponse middlewares (applied via applyLlmResponseMiddlewares) to be skipped for auto-aggregated responses. If there are middlewares performing critical tasks like content filtering, PII redaction, or structured validation on the llm.Response object, they will not run in this fallback path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked this path carefully.

The auto-aggregate fallback does not skip the whole middleware chain. It still runs the full streaming path through p.stream(...), which means the stream-phase hooks are already applied before aggregation:

  • raw stream middlewares
  • Outbound.TransformStream(...)
  • OnOutboundLlmStream(...)
  • Inbound.TransformStream(...)
  • inbound raw stream middlewares

What it does not do is re-enter the separate non-stream OnOutboundLlmResponse(...) phase after the stream has already been transformed and consumed.

So this is not a full middleware bypass; it is specifically a response-phase gap after a path that already executed the stream-phase hooks.

Given the current architecture, I am leaving that out of this PR:

  • stream-aware middleware already has OnOutboundLlmStream(...)
  • auto-aggregate is implemented by draining the transformed stream and synthesizing the final HTTP response
  • re-entering OnOutboundLlmResponse(...) would need a separate design for how to build a final *llm.Response from the drained stream path and how to avoid double-processing middleware that already handled the streaming phase

For this PR, the supported behavior is that the fallback path executes the streaming middleware phase, while any "also run response middleware after aggregation" change is treated as a separate follow-up design/task.

@greptile-apps

greptile-apps Bot commented May 14, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds a non-stream fallback path so that clients making non-stream requests can be served by channels configured with stream=require. When only require-stream channels are available, the pipeline is upgraded to streaming toward the provider, chunks are collected via autoAggregateStream, and the aggregated result is returned to the client as a conventional non-stream response.

  • Candidate selection (candidates_stream_policy.go): native non-stream channels are preferred; require-stream channels are kept only when supportsAutoAggregateRequest confirms the format supports server-side aggregation.
  • Stream forcing (outbound.go): shouldForceStreamingForCandidate upgrades the outbound request to stream=true and explicitly sets stream_options.include_usage=true, addressing the prior EnsureUsage gap.
  • Retry safety (pipeline.go): originalStream is captured before the retry loop and restored at the start of each iteration so forced-stream state doesn't leak into subsequent attempts.
  • Pass-through guard (pass_through.go): passThroughStreamAligned disables pass-through whenever the client's original stream intent differs from the effective (forced) transport, preventing the raw provider stream from being returned to a non-stream client.

Confidence Score: 5/5

Safe to merge — the auto-aggregation path is well-isolated, both new error types are wired into retry, and pass-through is correctly disabled when stream semantics change.

All three concerns from prior review rounds are resolved in this diff: the forced provider call now explicitly sets stream_options.include_usage=true in TransformRequest, the two new empty-stream errors are added to CanRetry, and the originalStream snapshot before the retry loop prevents forced-stream state from bleeding into retries.

No files require special attention. Test files cover the critical paths well.

Important Files Changed

Filename Overview
internal/server/orchestrator/candidates_stream_policy.go Refactors stream-policy filtering: non-stream requests now prefer native candidates and fall back to require-stream channels only when supportsAutoAggregateRequest approves the format.
internal/server/orchestrator/outbound.go Adds shouldForceStreamingForCandidate and inline stream-forcing in TransformRequest; resets StreamCompleted=false at each attempt; extends CanRetry to treat ErrEmptyStreamChunks and ErrEmptyAggregatedBody as retriable.
llm/pipeline/pipeline.go Captures originalStream before the retry loop and resets it each iteration; adds effectiveWantStream detection to route auto-aggregate requests through autoAggregateStream.
llm/pipeline/non_streaming.go Adds autoAggregateStream: invokes p.stream(), collects inbound chunks, calls Inbound.AggregateStreamChunks, checks for empty conditions, and returns a synthetic non-stream Response.
internal/server/orchestrator/pass_through.go Adds passThroughStreamAligned guard in isPassThroughEnabled: disables pass-through when OriginalRequestStream and effective LlmRequest.Stream disagree.
internal/server/orchestrator/state.go Adds OriginalRequestStream *bool field to capture the client's true stream intent before any candidate-driven forcing.
llm/pipeline/empty_response.go Adds ErrEmptyStreamChunks and ErrEmptyAggregatedBody sentinel errors wired into CanRetry for consistent retry behavior.
internal/server/orchestrator/inbound.go Sets p.state.OriginalRequestStream = llmRequest.Stream in PersistentInboundTransformer.TransformRequest to preserve the client's stream intent.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[Non-stream client request] --> B{Candidate selection}
    B --> C{Any native non-stream candidates?}
    C -->|Yes| D[Return native candidates — normal non-stream path]
    C -->|No - all require-stream| E{supportsAutoAggregateRequest?}
    E -->|No - embedding/compact/AI SDK| F[Return nil — no candidates]
    E -->|Yes - chat/completion formats| G[Return require-stream candidates]
    G --> H[TransformRequest forces stream=true, IncludeUsage=true]
    H --> I[processRequest: originalWantStream=false, effectiveWantStream=true]
    I --> K[autoAggregateStream]
    K --> L[p.stream — full middleware pipeline]
    L --> M[Collect inbound SSE chunks]
    M --> N{len chunks == 0?}
    N -->|Yes| O[ErrEmptyStreamChunks — retriable]
    N -->|No| P[Inbound.AggregateStreamChunks]
    P --> Q{len body == 0?}
    Q -->|Yes| R[ErrEmptyAggregatedBody — retriable]
    Q -->|No| S[Return synthetic non-stream Response]
    S --> T[Client receives normal non-stream response]
    D --> U[Normal notStream path]
    U --> T
Loading

Reviews (5): Last reviewed commit: "Merge remote-tracking branch 'origin/uns..." | Re-trigger Greptile

Comment thread llm/pipeline/non_streaming.go
Comment thread internal/server/orchestrator/candidates_stream_policy_test.go Outdated
ttttmr and others added 2 commits May 14, 2026 16:42
Treat empty auto-aggregated stream results the same as other empty responses so same-channel retry still applies. Also format the affected tests to satisfy CI lint checks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Ensure auto-aggregated require-stream fallbacks also enable stream usage collection after the provider request is upgraded to streaming. Refresh the related regression tests and formatting so CI lint stays clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@ttttmr

ttttmr commented May 14, 2026

Copy link
Copy Markdown
Contributor Author

#922 如果只配了流式供应商,许多非流式的任务会失败

Treat nil and false stream values as equivalent when deciding whether request semantics still match for pass-through eligibility. Add a regression test covering the nil-to-false alignment case.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@looplj

looplj commented May 21, 2026

Copy link
Copy Markdown
Owner

我觉得这是一个好想法,但是不知道有没有更好的实现方式。

…eam-aggregation

# Conflicts:
#	llm/pipeline/pipeline.go
@ttttmr

ttttmr commented May 23, 2026

Copy link
Copy Markdown
Contributor Author

目前我也没啥好想法,这个也是一个挺边缘的case了,大部分还是都用流式请求,目前我这边的场景是主要是block小型测试

@looplj looplj merged commit 0eceea0 into looplj:unstable May 25, 2026
4 checks passed
@ttttmr ttttmr deleted the worktree-auto-stream-aggregation branch May 27, 2026 04:02
junjiangao pushed a commit to junjiangao/axonhub that referenced this pull request May 27, 2026
…1661)

* feat: auto aggregate require-stream responses

Allow require-stream channels to serve non-stream requests by upgrading provider calls to streaming and aggregating the response server-side. Keep retry and pass-through behavior aligned so clients still receive a stable non-stream contract.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: retry empty auto-aggregated stream responses

Treat empty auto-aggregated stream results the same as other empty responses so same-channel retry still applies. Also format the affected tests to satisfy CI lint checks.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: preserve usage metadata for forced streaming requests

Ensure auto-aggregated require-stream fallbacks also enable stream usage collection after the provider request is upgraded to streaming. Refresh the related regression tests and formatting so CI lint stays clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: align pass-through stream nil and false semantics

Treat nil and false stream values as equivalent when deciding whether request semantics still match for pass-through eligibility. Add a regression test covering the nil-to-false alignment case.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants