Skip to content

feat: 添加响应超时重试配置#1804

Merged
looplj merged 14 commits into
looplj:unstablefrom
xuyufengfei:unstable
Jun 10, 2026
Merged

feat: 添加响应超时重试配置#1804
looplj merged 14 commits into
looplj:unstablefrom
xuyufengfei:unstable

Conversation

@xuyufengfei

Copy link
Copy Markdown
Contributor

在重试设置中增加流式首字超时、非流式响应超时配置,0 表示关闭。
流式首字超时覆盖 DoStream 建流阶段和首个 LLM event 预读阶段,上游服务器宕机/卡住且完全不返回数据时会触发超时错误。 非流式超时覆盖请求调用本身,上游无响应会归类为非流式响应超时错误。
超时重试跳过同渠道最大重试次数限制,直接按负载均衡策略切换下一个渠道。

@greptile-apps

greptile-apps Bot commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds two configurable timeout fields to the retry policy — streamFirstEventTimeoutSeconds (covers DoStream setup and first LLM event pre-read) and nonStreamResponseTimeoutSeconds (covers the full non-streaming request) — with 0 meaning disabled. When either timeout fires it bypasses same-channel retries and jumps directly to a channel switch.

  • firstEventTimeoutGuard uses an atomic three-state machine (pending / completed / timedOut) and a context.WithCancel-derived stream context to interrupt blocking Next() calls; the cancelOnCloseStream wrapper ensures the derived context is cleaned up when the consumer closes the stream.
  • Non-stream timeouts are implemented via context.WithTimeoutCause and detected post-call with context.Cause, keeping the timeout-error identity distinct from ordinary context cancellations.
  • Backend validates and clamps both fields to [0, 600] seconds; GraphQL schema, generated resolver, and frontend UI inputs are all updated consistently.

Confidence Score: 5/5

Safe to merge; only minor design observations found, no incorrect runtime behavior in the changed paths.

The timeout guard's atomic state machine is well-structured, nil-guard paths are handled throughout, and both timeout types correctly propagate as distinct sentinel errors that the retry loop can identify. The two observations — retry delay not being skipped for timeout-triggered switches and an unreachable else-if branch — do not affect correctness.

llm/pipeline/stream.go and llm/pipeline/pipeline.go are where the non-trivial concurrency and retry logic lives; everything else is straightforward plumbing.

Important Files Changed

Filename Overview
llm/pipeline/stream.go Adds firstEventTimeoutGuard with atomic state machine, cancelOnCloseStream wrapper, and rewrites preReadLlmStream to support first-event timeout enforcement. Contains an unreachable dead-code branch.
llm/pipeline/pipeline.go Adds WithResponseTimeouts option, nonStreamTimeout/streamFirstEventTimeout fields, and integrates timeout-triggered retry bypass of same-channel retries. Retry delay still applies to timeout retries despite the intent to switch directly.
llm/pipeline/non_streaming.go Minor change: passes explicit 0 for firstEventTimeout in autoAggregateStream so no stream-first-event timeout applies; non-stream timeout covers the entire aggregation.
internal/server/orchestrator/orchestrator.go Injects WithResponseTimeouts inside the retryPolicy.Enabled guard; timeout values are correctly converted from seconds to time.Duration.
internal/server/biz/system.go Adds StreamFirstEventTimeoutSeconds and NonStreamResponseTimeoutSeconds fields with proper clamping [0, 600] in normalizeRetryPolicy.
internal/server/gql/system.graphql Adds streamFirstEventTimeoutSeconds and nonStreamResponseTimeoutSeconds to RetryPolicy type (Int!) and UpdateRetryPolicyInput (Int optional).
frontend/src/features/system/components/retry-settings.tsx Adds two new number inputs (0-600 range) for the timeout fields with proper default values and change handlers.

Sequence Diagram

sequenceDiagram
    participant O as Orchestrator
    participant P as pipeline.Process
    participant PR as processRequest
    participant S as stream()
    participant G as firstEventTimeoutGuard
    participant E as Executor.DoStream
    participant N as nextLlmStreamEvent

    O->>P: Process(ctx, request)
    P->>PR: processRequest(ctx, llmRequest)

    alt Streaming request
        PR->>S: stream(ctx, executor, req, streamFirstEventTimeout)
        S->>G: newFirstEventTimeoutGuard(ctx, timeout)
        G-->>S: streamCtx, guard
        S->>E: DoStream(streamCtx, req)
        E-->>S: outboundStream
        Note over G: timer fires, CAS pending to timedOut, cancel(streamCtx)
        S->>N: "nextLlmStreamEvent(ctx, llmStream, firstEvent=true, guard)"
        N->>N: llmStream.Next() blocked until streamCtx cancelled or event arrives
        alt Event arrives first
            N->>G: acceptFirstEvent() CAS pending to completed, stop timer
            N-->>S: (true, nil)
        else Timeout fires first
            N->>G: "completeFirstEventPhase() CAS fails state=timedOut"
            N->>G: timedOut() returns true
            N-->>S: (false, ErrStreamFirstEventTimeout)
        end
        S-->>PR: stream or ErrStreamFirstEventTimeout
    else Non-streaming request
        PR->>PR: withNonStreamTimeout(ctx) returns timeoutCtx
        PR->>PR: notStream or autoAggregateStream with timeoutCtx
        Note over PR: context.WithTimeoutCause sets cause=ErrNonStreamResponseTimeout on expiry
        PR->>PR: isNonStreamTimeout(timeoutCtx) via context.Cause
        PR-->>P: response or ErrNonStreamResponseTimeout
    end

    alt isResponseTimeoutError(lastErr)
        P->>P: skip same-channel retry
        P->>P: NextChannel() channel switch
    else Normal error
        P->>P: CanRetry() same-channel retry first
        P->>P: NextChannel() if same-channel exhausted
    end
    P->>P: time.Sleep(retryDelay) applied to all retries including timeout
Loading

Reviews (6): Last reviewed commit: "Merge pull request #5 from xuyufengfei-c..." | Re-trigger Greptile

Comment thread llm/pipeline/stream.go
Comment thread llm/pipeline/pipeline.go Outdated
Comment thread internal/server/biz/system.go
Comment thread llm/pipeline/stream.go Outdated
Comment thread llm/pipeline/stream.go
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Comment thread llm/pipeline/stream.go Outdated
@looplj looplj merged commit 0c74900 into looplj:unstable Jun 10, 2026
4 checks passed
junjiangao pushed a commit to junjiangao/axonhub that referenced this pull request Jun 14, 2026
* feat(重试): 添加响应超时重试配置

* fix(重试): 覆盖无响应上游超时

* fix(重试): 区分父级非流超时

* fix(重试): 处理超时审查反馈

* fix(重试): 消除流式首字超时竞态

* refactor(重试): 重命名流式预读方法

* Update llm/pipeline/stream.go

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* fix(重试): 保留首字超时获胜状态

* test(重试): 移除首字超时回归测试

---------

Co-authored-by: xuyufengfei-cyber <xuyufengfei-cyber@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants