Skip to content

Commit c41cf55

Browse files
perf(streaming): fast-path unchanged chat streams
1 parent a0a1d06 commit c41cf55

2 files changed

Lines changed: 202 additions & 1 deletion

File tree

internal/server/handlers_test.go

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1991,6 +1991,106 @@ data: [DONE]
19911991
}
19921992
}
19931993

1994+
func TestChatCompletionStreaming_FastPathUsesPassthroughForOpenAICompatibleProviders(t *testing.T) {
1995+
streamData := "data: {\"id\":\"chatcmpl-123\",\"choices\":[{\"delta\":{\"content\":\"Hello\"}}]}\n\ndata: [DONE]\n\n"
1996+
reqBody := `{"model":"gpt-4o-mini","stream":true,"messages":[{"role":"user","content":"Hi"}]}`
1997+
mock := &mockProvider{
1998+
supportedModels: []string{"gpt-4o-mini"},
1999+
providerTypes: map[string]string{
2000+
"gpt-4o-mini": "openai",
2001+
},
2002+
passthroughResponse: &core.PassthroughResponse{
2003+
StatusCode: http.StatusOK,
2004+
Headers: map[string][]string{
2005+
"Content-Type": {"text/event-stream"},
2006+
},
2007+
Body: io.NopCloser(strings.NewReader(streamData)),
2008+
},
2009+
}
2010+
2011+
e := echo.New()
2012+
handler := NewHandler(mock, nil, nil, nil)
2013+
2014+
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody))
2015+
req.Header.Set("Content-Type", "application/json")
2016+
rec := httptest.NewRecorder()
2017+
c := e.NewContext(req, rec)
2018+
2019+
err := handler.ChatCompletion(c)
2020+
if err != nil {
2021+
t.Fatalf("handler returned error: %v", err)
2022+
}
2023+
2024+
if rec.Code != http.StatusOK {
2025+
t.Fatalf("status = %d, want %d", rec.Code, http.StatusOK)
2026+
}
2027+
if got := rec.Header().Get("Content-Type"); got != "text/event-stream" {
2028+
t.Fatalf("Content-Type = %q, want text/event-stream", got)
2029+
}
2030+
if got := rec.Body.String(); got != streamData {
2031+
t.Fatalf("stream body = %q, want %q", got, streamData)
2032+
}
2033+
if mock.lastPassthroughProvider != "openai" {
2034+
t.Fatalf("lastPassthroughProvider = %q, want openai", mock.lastPassthroughProvider)
2035+
}
2036+
if mock.lastPassthroughReq == nil {
2037+
t.Fatal("lastPassthroughReq = nil, want passthrough request")
2038+
}
2039+
if body := readPassthroughRequestBody(t, mock.lastPassthroughReq.Body); body != reqBody {
2040+
t.Fatalf("passthrough body = %q, want %q", body, reqBody)
2041+
}
2042+
}
2043+
2044+
func TestChatCompletionStreaming_FastPathSkipsQualifiedModelRewrite(t *testing.T) {
2045+
streamData := "data: {\"id\":\"chatcmpl-123\",\"choices\":[{\"delta\":{\"content\":\"Hello\"}}]}\n\ndata: [DONE]\n\n"
2046+
provider := &capturingProvider{
2047+
mockProvider: mockProvider{
2048+
supportedModels: []string{"gpt-4o-mini"},
2049+
providerTypes: map[string]string{
2050+
"gpt-4o-mini": "openai",
2051+
},
2052+
streamData: streamData,
2053+
passthroughResponse: &core.PassthroughResponse{
2054+
StatusCode: http.StatusOK,
2055+
Headers: map[string][]string{
2056+
"Content-Type": {"text/event-stream"},
2057+
},
2058+
Body: io.NopCloser(strings.NewReader("data: should-not-be-used\n\n")),
2059+
},
2060+
},
2061+
}
2062+
2063+
e := echo.New()
2064+
handler := NewHandler(provider, nil, nil, nil)
2065+
2066+
reqBody := `{"model":"openai/gpt-4o-mini","stream":true,"messages":[{"role":"user","content":"Hi"}]}`
2067+
req := httptest.NewRequest(http.MethodPost, "/v1/chat/completions", strings.NewReader(reqBody))
2068+
req.Header.Set("Content-Type", "application/json")
2069+
rec := httptest.NewRecorder()
2070+
c := e.NewContext(req, rec)
2071+
2072+
err := handler.ChatCompletion(c)
2073+
if err != nil {
2074+
t.Fatalf("handler returned error: %v", err)
2075+
}
2076+
2077+
if provider.lastPassthroughReq != nil {
2078+
t.Fatal("lastPassthroughReq != nil, want rewritten request to use StreamChatCompletion path")
2079+
}
2080+
if provider.capturedChatReq == nil {
2081+
t.Fatal("capturedChatReq = nil, want StreamChatCompletion request")
2082+
}
2083+
if provider.capturedChatReq.Model != "gpt-4o-mini" {
2084+
t.Fatalf("captured model = %q, want gpt-4o-mini", provider.capturedChatReq.Model)
2085+
}
2086+
if provider.capturedChatReq.Provider != "openai" {
2087+
t.Fatalf("captured provider = %q, want openai", provider.capturedChatReq.Provider)
2088+
}
2089+
if got := rec.Body.String(); got != streamData {
2090+
t.Fatalf("stream body = %q, want %q", got, streamData)
2091+
}
2092+
}
2093+
19942094
func TestHandleStreamingResponse_FlushesEachChunk(t *testing.T) {
19952095
e := echo.New()
19962096
handler := NewHandler(&mockProvider{}, nil, nil, nil)
@@ -4307,7 +4407,10 @@ func TestStreamingChatCompletion_InjectsStreamOptions(t *testing.T) {
43074407
provider := &capturingProvider{
43084408
mockProvider: mockProvider{
43094409
supportedModels: []string{"gpt-4o-mini"},
4310-
streamData: streamData,
4410+
providerTypes: map[string]string{
4411+
"gpt-4o-mini": "openai",
4412+
},
4413+
streamData: streamData,
43114414
},
43124415
}
43134416

@@ -4337,6 +4440,10 @@ func TestStreamingChatCompletion_InjectsStreamOptions(t *testing.T) {
43374440
t.Errorf("expected status 200, got %d", rec.Code)
43384441
}
43394442

4443+
if provider.lastPassthroughReq != nil {
4444+
t.Fatal("lastPassthroughReq != nil, want usage-enforced streaming to stay on translated stream path")
4445+
}
4446+
43404447
if provider.capturedChatReq.StreamOptions == nil {
43414448
t.Fatal("ChatCompletion streaming should have StreamOptions injected")
43424449
}

internal/server/translated_inference_service.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ func (s *translatedInferenceService) ChatCompletion(c *echo.Context) error {
4646
requestID := requestIDFromContextOrHeader(c.Request())
4747

4848
if req.Stream {
49+
if handled, err := s.tryFastPathStreamingChatPassthrough(c, plan, req); handled {
50+
return err
51+
}
4952
return s.handleStreamingResponse(c, usageModel, providerType, func() (io.ReadCloser, error) {
5053
return s.provider.StreamChatCompletion(ctx, streamReq)
5154
})
@@ -104,6 +107,97 @@ func (s *translatedInferenceService) Responses(c *echo.Context) error {
104107
return c.JSON(http.StatusOK, resp)
105108
}
106109

110+
func (s *translatedInferenceService) tryFastPathStreamingChatPassthrough(c *echo.Context, plan *core.ExecutionPlan, req *core.ChatRequest) (bool, error) {
111+
if !s.canFastPathStreamingChatPassthrough(plan, req) {
112+
return false, nil
113+
}
114+
115+
passthroughProvider, ok := s.provider.(core.RoutablePassthrough)
116+
if !ok {
117+
return false, nil
118+
}
119+
120+
ctx, _ := requestContextWithRequestID(c.Request())
121+
c.SetRequest(c.Request().WithContext(ctx))
122+
123+
const endpoint = "/chat/completions"
124+
providerType := strings.TrimSpace(plan.ProviderType)
125+
resp, err := passthroughProvider.Passthrough(ctx, providerType, &core.PassthroughRequest{
126+
Method: c.Request().Method,
127+
Endpoint: endpoint,
128+
Body: c.Request().Body,
129+
Headers: buildPassthroughHeaders(ctx, c.Request().Header),
130+
})
131+
if err != nil {
132+
return true, handleError(c, err)
133+
}
134+
135+
info := &core.PassthroughRouteInfo{
136+
Provider: providerType,
137+
RawEndpoint: strings.TrimPrefix(endpoint, "/"),
138+
AuditPath: c.Request().URL.Path,
139+
Model: resolvedModelFromPlan(plan, req.Model),
140+
}
141+
passthrough := passthroughService{
142+
provider: s.provider,
143+
logger: s.logger,
144+
usageLogger: s.usageLogger,
145+
pricingResolver: s.pricingResolver,
146+
}
147+
return true, passthrough.proxyPassthroughResponse(c, providerType, endpoint, info, resp)
148+
}
149+
150+
func (s *translatedInferenceService) canFastPathStreamingChatPassthrough(plan *core.ExecutionPlan, req *core.ChatRequest) bool {
151+
if req == nil || !req.Stream {
152+
return false
153+
}
154+
if s.translatedRequestPatcher != nil || s.shouldEnforceReturningUsageData() {
155+
return false
156+
}
157+
if plan == nil || plan.Resolution == nil {
158+
return false
159+
}
160+
161+
providerType := strings.ToLower(strings.TrimSpace(plan.ProviderType))
162+
switch providerType {
163+
case "openai", "azure", "openrouter":
164+
default:
165+
return false
166+
}
167+
168+
if translatedStreamingSelectorRewriteRequired(plan.Resolution) {
169+
return false
170+
}
171+
if translatedStreamingChatBodyRewriteRequired(req) {
172+
return false
173+
}
174+
175+
return true
176+
}
177+
178+
func translatedStreamingSelectorRewriteRequired(resolution *core.RequestModelResolution) bool {
179+
if resolution == nil {
180+
return true
181+
}
182+
183+
requestedModel := strings.TrimSpace(resolution.RequestedModel)
184+
requestedProvider := strings.TrimSpace(resolution.RequestedProvider)
185+
resolvedModel := strings.TrimSpace(resolution.ResolvedSelector.Model)
186+
resolvedProvider := strings.TrimSpace(resolution.ResolvedSelector.Provider)
187+
188+
return requestedModel != resolvedModel || requestedProvider != resolvedProvider
189+
}
190+
191+
func translatedStreamingChatBodyRewriteRequired(req *core.ChatRequest) bool {
192+
if req == nil {
193+
return true
194+
}
195+
196+
model := strings.ToLower(strings.TrimSpace(req.Model))
197+
oSeries := len(model) >= 2 && model[0] == 'o' && model[1] >= '0' && model[1] <= '9'
198+
return oSeries && (req.MaxTokens != nil || req.Temperature != nil)
199+
}
200+
107201
func (s *translatedInferenceService) Embeddings(c *echo.Context) error {
108202
req, err := canonicalJSONRequestFromSemantics[*core.EmbeddingRequest](c, core.DecodeEmbeddingRequest)
109203
if err != nil {

0 commit comments

Comments
 (0)