fix(pipeline): prevent channels from getting stuck in processing status#1530
Conversation
…stuck processing status When applyRawRequestMiddlewares fails partway through (e.g. withChannelLimiter returns ChannelQueueError after persistRequestExecution has run), the pipeline returned the error without calling applyRawErrorResponseMiddlewares. This left request executions stuck in "processing" forever because persistRequestExecution.OnOutboundRawError was never invoked. The same latent bug existed in notStream and stream for non-executor error paths. In stream, additional care was needed to save original stream references before middleware wrapping, since middleware functions return nil on error and calling Close() on nil would panic. Add applyRawErrorResponseMiddlewares calls on all error paths: - pipeline.go: applyRawRequestMiddlewares failure - non_streaming.go: raw response, transform, LLM response, empty response, inbound transform, and inbound raw response middlewares failures - stream.go: raw stream, transform stream, LLM stream, empty response, inbound transform, and inbound raw stream middlewares failures All OnOutboundRawError handlers are idempotent (nil checks, sync.Once guards) so calling them after a later middleware fails is safe.
There was a problem hiding this comment.
Code Review
This pull request enhances error handling and resource management within the LLM pipeline by ensuring that error response middlewares are consistently invoked across all failure paths and that streams are properly closed to prevent leaks. A potential resource leak was identified in the streaming pipeline where a stream could remain open if checkEmptyResponse returns an error, as the reference to the stream is overwritten before it can be closed; a suggestion was provided to track and close the stream in this scenario.
Greptile SummaryThis PR fixes a bug where requests could get permanently stuck in "processing" status when Confidence Score: 5/5Safe to merge; the only finding is a redundant double-close in the checkEmptyResponse error path that is benign with current stream implementations. The core fix is correct and well-targeted. All remaining findings are P2: the double-close in stream.go is redundant (not harmful with standard Go streams) and the test's unconditional OnOutboundRawError notification for un-executed middlewares is intentional and consistent with the idempotency guarantee stated in the PR description. llm/pipeline/stream.go — specifically the checkEmptyResponse error path where rawLlmStream.Close() is called after checkEmptyResponse has already closed the stream internally. Important Files Changed
Sequence DiagramsequenceDiagram
participant C as Caller
participant PR as processRequest
participant MW as applyRawRequestMiddlewares
participant NS as notStream / stream
participant EM as applyRawErrorResponseMiddlewares
C->>PR: Process(ctx, request)
PR->>MW: applyRawRequestMiddlewares(ctx, req)
alt Middleware fails (e.g. ChannelQueueError)
MW-->>PR: err
PR->>EM: applyRawErrorResponseMiddlewares(ctx, err) NEW
PR-->>C: error
else Middleware succeeds
MW-->>PR: httpReq
PR->>NS: notStream / stream
alt Executor fails
NS->>EM: applyRawErrorResponseMiddlewares (pre-existing)
NS-->>PR: error
else Post-executor step fails (transform, middlewares, empty response)
NS->>EM: applyRawErrorResponseMiddlewares(ctx, err) NEW
NS-->>PR: error
else All steps succeed
NS-->>PR: result
PR-->>C: result
end
end
Reviews (2): Last reviewed commit: "fix(pipeline): close stream on checkEmpt..." | Re-trigger Greptile |
checkEmptyResponse did not close the llmStream when llmStream.Err() returned a non-nil error, and the caller in stream() lost the reference to the original stream. Save the stream before calling checkEmptyResponse and close it on error, matching the save-before-wrap pattern used elsewhere in stream(). Also add assertion message for unexecuted-middleware notification invariant in the regression test.
Summary
Requests can get permanently stuck in "processing" status, causing the system to failover to a different channel without giving the first one a chance to complete.
Introduced by #1503 (per-channel concurrency queue + admission control).
Bug Behavior
Root Cause
When
applyRawRequestMiddlewaresfails partway through (e.g.withChannelLimiterreturnsChannelQueueErrorafterpersistRequestExecutionhas already run), the pipeline returns the error without callingapplyRawErrorResponseMiddlewares. This meanspersistRequestExecution.OnOutboundRawErroris never invoked, leaving the request execution stuck in "processing" forever.Before #1503, no middleware after
persistRequestExecutioncould fail inOnOutboundRawRequest, so this was a latent bug. The newwithChannelLimitermiddleware (positioned afterpersistRequestExecution) can returnChannelQueueError, which now triggers it.The same latent bug existed in
notStreamandstreamfor non-executor error paths (e.g. TransformResponse failure, empty response detection).Key Changes
pipeline.go: CallapplyRawErrorResponseMiddlewareswhenapplyRawRequestMiddlewaresfailsnon_streaming.go: Add cleanup calls on all non-executor error paths (raw response, transform, LLM response, empty response, inbound transform, inbound raw response)stream.go: Add cleanup calls on all non-executor error paths + save original stream references before wrapping to prevent nil-pointer panics when closing on error. Also fix stream leak incheckEmptyResponsewhenllmStream.Err()returns non-nil.middleware_test.go: Regression test verifying already-executed middlewares receiveOnOutboundRawErrorwhen a later middleware failsRisks
All
OnOutboundRawErrorhandlers are idempotent (nil checks, sync.Once guards), so calling them after a later middleware fails is safe.Known limitation (P2, pre-existing)
If
applyRawStreamMiddlewaresiterates multiple wrapping middlewares and a later one fails after an earlier one has already wrapped the stream, only the innermost (pre-wrap) stream is closed. Intermediate wrapped streams from successfully-run middlewares are not cleaned up. This is rare (requires ≥2 raw stream middlewares where one fails mid-chain) and pre-existing — not introduced by this PR. A proper fix would require tracking intermediate streams in the middleware application loop, which is a separate refactor.