Skip to content

Commit 4f4fc05

Browse files
committed
initial streaming vs buffered tests
1 parent cb61406 commit 4f4fc05

1 file changed

Lines changed: 375 additions & 0 deletions

File tree

Lines changed: 375 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,375 @@
1+
package proxy
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"net/http/httptest"
9+
"strings"
10+
"sync"
11+
"testing"
12+
"time"
13+
14+
"github.com/thushan/olla/internal/adapter/proxy/olla"
15+
"github.com/thushan/olla/internal/adapter/proxy/sherpa"
16+
"github.com/thushan/olla/internal/core/constants"
17+
"github.com/thushan/olla/internal/core/domain"
18+
"github.com/thushan/olla/internal/core/ports"
19+
)
20+
21+
// mockResponseWriter tracks flush calls for testing
22+
type mockResponseWriter struct {
23+
httptest.ResponseRecorder
24+
flushCount int
25+
mu sync.Mutex
26+
}
27+
28+
func (m *mockResponseWriter) Flush() {
29+
m.mu.Lock()
30+
m.flushCount++
31+
m.mu.Unlock()
32+
}
33+
34+
func (m *mockResponseWriter) getFlushCount() int {
35+
m.mu.Lock()
36+
defer m.mu.Unlock()
37+
return m.flushCount
38+
}
39+
40+
// TestStreamingProfiles tests that proxy profiles correctly control flushing behavior
41+
func TestStreamingProfiles(t *testing.T) {
42+
suites := []ProxyTestSuite{
43+
SherpaTestSuite{},
44+
OllaTestSuite{},
45+
}
46+
47+
for _, suite := range suites {
48+
t.Run(suite.Name(), func(t *testing.T) {
49+
testStreamingProfilesForSuite(t, suite)
50+
})
51+
}
52+
}
53+
54+
func testStreamingProfilesForSuite(t *testing.T, suite ProxyTestSuite) {
55+
tests := []struct {
56+
name string
57+
profile string
58+
contentType string
59+
responseChunks int
60+
expectFlushes bool
61+
description string
62+
}{
63+
// Buffered profile - should NEVER flush
64+
{
65+
name: "buffered_profile_sse_content",
66+
profile: constants.ConfigurationProxyProfileBuffered,
67+
contentType: "text/event-stream",
68+
responseChunks: 5,
69+
expectFlushes: false,
70+
description: "Buffered profile should not flush even for SSE content",
71+
},
72+
{
73+
name: "buffered_profile_json_streaming",
74+
profile: constants.ConfigurationProxyProfileBuffered,
75+
contentType: "application/stream+json",
76+
responseChunks: 5,
77+
expectFlushes: false,
78+
description: "Buffered profile should not flush streaming JSON",
79+
},
80+
{
81+
name: "buffered_profile_plain_text",
82+
profile: constants.ConfigurationProxyProfileBuffered,
83+
contentType: "text/plain",
84+
responseChunks: 5,
85+
expectFlushes: false,
86+
description: "Buffered profile should not flush plain text",
87+
},
88+
89+
// Streaming profile - should ALWAYS flush
90+
{
91+
name: "streaming_profile_binary_content",
92+
profile: constants.ConfigurationProxyProfileStreaming,
93+
contentType: "image/png",
94+
responseChunks: 5,
95+
expectFlushes: true,
96+
description: "Streaming profile should flush even for binary content",
97+
},
98+
{
99+
name: "streaming_profile_json",
100+
profile: constants.ConfigurationProxyProfileStreaming,
101+
contentType: "application/json",
102+
responseChunks: 5,
103+
expectFlushes: true,
104+
description: "Streaming profile should flush JSON responses",
105+
},
106+
{
107+
name: "streaming_profile_pdf",
108+
profile: constants.ConfigurationProxyProfileStreaming,
109+
contentType: "application/pdf",
110+
responseChunks: 5,
111+
expectFlushes: true,
112+
description: "Streaming profile should flush even PDFs",
113+
},
114+
115+
// Auto profile - flush based on content type detection
116+
{
117+
name: "auto_profile_sse_content",
118+
profile: constants.ConfigurationProxyProfileAuto,
119+
contentType: "text/event-stream",
120+
responseChunks: 5,
121+
expectFlushes: true,
122+
description: "Auto profile should flush SSE content",
123+
},
124+
{
125+
name: "auto_profile_ndjson",
126+
profile: constants.ConfigurationProxyProfileAuto,
127+
contentType: "application/x-ndjson",
128+
responseChunks: 5,
129+
expectFlushes: true,
130+
description: "Auto profile should flush NDJSON",
131+
},
132+
{
133+
name: "auto_profile_binary_image",
134+
profile: constants.ConfigurationProxyProfileAuto,
135+
contentType: "image/jpeg",
136+
responseChunks: 5,
137+
expectFlushes: false,
138+
description: "Auto profile should NOT flush binary images",
139+
},
140+
{
141+
name: "auto_profile_pdf",
142+
profile: constants.ConfigurationProxyProfileAuto,
143+
contentType: "application/pdf",
144+
responseChunks: 5,
145+
expectFlushes: false,
146+
description: "Auto profile should NOT flush PDFs",
147+
},
148+
{
149+
name: "auto_profile_plain_text",
150+
profile: constants.ConfigurationProxyProfileAuto,
151+
contentType: "text/plain; charset=utf-8",
152+
responseChunks: 5,
153+
expectFlushes: true,
154+
description: "Auto profile should flush plain text (common for LLMs)",
155+
},
156+
{
157+
name: "auto_profile_json",
158+
profile: constants.ConfigurationProxyProfileAuto,
159+
contentType: "application/json",
160+
responseChunks: 5,
161+
expectFlushes: true,
162+
description: "Auto profile should flush JSON by default",
163+
},
164+
165+
// Edge cases and mixed scenarios
166+
{
167+
name: "buffered_profile_with_chunked_encoding",
168+
profile: constants.ConfigurationProxyProfileBuffered,
169+
contentType: "text/event-stream",
170+
responseChunks: 10,
171+
expectFlushes: false,
172+
description: "Buffered profile should not flush even with many chunks",
173+
},
174+
{
175+
name: "streaming_profile_empty_content_type",
176+
profile: constants.ConfigurationProxyProfileStreaming,
177+
contentType: "",
178+
responseChunks: 5,
179+
expectFlushes: true,
180+
description: "Streaming profile should flush even with no content type",
181+
},
182+
{
183+
name: "auto_profile_unknown_content_type",
184+
profile: constants.ConfigurationProxyProfileAuto,
185+
contentType: "application/x-custom-type",
186+
responseChunks: 5,
187+
expectFlushes: true,
188+
description: "Auto profile should default to streaming for unknown types",
189+
},
190+
}
191+
192+
for _, tt := range tests {
193+
t.Run(tt.name, func(t *testing.T) {
194+
// Create upstream server that sends chunked responses
195+
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
196+
w.Header().Set("Content-Type", tt.contentType)
197+
w.WriteHeader(http.StatusOK)
198+
199+
// Send response in chunks
200+
for i := 0; i < tt.responseChunks; i++ {
201+
chunk := fmt.Sprintf("chunk %d\n", i)
202+
w.Write([]byte(chunk))
203+
// Upstream flushes to simulate streaming
204+
if f, ok := w.(http.Flusher); ok {
205+
f.Flush()
206+
}
207+
// Small delay to simulate streaming
208+
time.Sleep(5 * time.Millisecond)
209+
}
210+
}))
211+
defer upstream.Close()
212+
213+
// Setup proxy with the test profile
214+
endpoint := createTestEndpoint("test", upstream.URL, domain.StatusHealthy)
215+
discovery := &mockDiscoveryService{endpoints: []*domain.Endpoint{endpoint}}
216+
selector := &mockEndpointSelector{endpoint: endpoint}
217+
collector := createTestStatsCollector()
218+
219+
// Create config with the test profile
220+
var proxy ports.ProxyService
221+
if suite.Name() == "Sherpa" {
222+
config := &sherpa.Configuration{
223+
Profile: tt.profile,
224+
ResponseTimeout: 5 * time.Second,
225+
ReadTimeout: 5 * time.Second,
226+
StreamBufferSize: 8192,
227+
}
228+
proxy = suite.CreateProxy(discovery, selector, config, collector)
229+
} else {
230+
config := &olla.Configuration{
231+
Profile: tt.profile,
232+
ResponseTimeout: 5 * time.Second,
233+
ReadTimeout: 5 * time.Second,
234+
StreamBufferSize: 8192,
235+
MaxIdleConns: 10,
236+
IdleConnTimeout: 90 * time.Second,
237+
MaxConnsPerHost: 5,
238+
}
239+
proxy = suite.CreateProxy(discovery, selector, config, collector)
240+
}
241+
242+
// Create request
243+
req, stats, rlog := createTestRequestWithBody("GET", "/api/test", "")
244+
245+
// Use our mock response writer to track flushes
246+
w := &mockResponseWriter{
247+
ResponseRecorder: *httptest.NewRecorder(),
248+
}
249+
250+
// Execute proxy request
251+
err := proxy.ProxyRequest(req.Context(), w, req, stats, rlog)
252+
if err != nil {
253+
t.Fatalf("Proxy request failed: %v", err)
254+
}
255+
256+
// Verify response was received
257+
result := w.Result()
258+
body, _ := io.ReadAll(result.Body)
259+
if !strings.Contains(string(body), "chunk") {
260+
t.Errorf("Response body doesn't contain expected chunks: %s", body)
261+
}
262+
263+
// Check flush behavior
264+
flushCount := w.getFlushCount()
265+
if tt.expectFlushes && flushCount == 0 {
266+
t.Errorf("%s - Expected flushes but got none. Profile: %s, Content-Type: %s",
267+
tt.description, tt.profile, tt.contentType)
268+
} else if !tt.expectFlushes && flushCount > 0 {
269+
t.Errorf("%s - Expected no flushes but got %d. Profile: %s, Content-Type: %s",
270+
tt.description, flushCount, tt.profile, tt.contentType)
271+
}
272+
273+
// Log results for debugging
274+
t.Logf("Test: %s, Profile: %s, Content-Type: %s, Flushes: %d, Expected flushes: %v",
275+
tt.name, tt.profile, tt.contentType, flushCount, tt.expectFlushes)
276+
})
277+
}
278+
}
279+
280+
// TestStreamingProfilesWithContextOverride tests that context stream value works with profiles
281+
func TestStreamingProfilesWithContextOverride(t *testing.T) {
282+
suites := []ProxyTestSuite{
283+
SherpaTestSuite{},
284+
OllaTestSuite{},
285+
}
286+
287+
for _, suite := range suites {
288+
t.Run(suite.Name(), func(t *testing.T) {
289+
// Create upstream that sends binary content
290+
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
291+
w.Header().Set("Content-Type", "image/png")
292+
w.WriteHeader(http.StatusOK)
293+
294+
// Send fake binary data in chunks
295+
for i := 0; i < 3; i++ {
296+
w.Write([]byte{0xFF, 0xD8, 0xFF, 0xE0}) // Fake JPEG header
297+
if f, ok := w.(http.Flusher); ok {
298+
f.Flush()
299+
}
300+
time.Sleep(5 * time.Millisecond)
301+
}
302+
}))
303+
defer upstream.Close()
304+
305+
// Setup proxy with auto profile
306+
endpoint := createTestEndpoint("test", upstream.URL, domain.StatusHealthy)
307+
discovery := &mockDiscoveryService{endpoints: []*domain.Endpoint{endpoint}}
308+
selector := &mockEndpointSelector{endpoint: endpoint}
309+
collector := createTestStatsCollector()
310+
311+
var proxy ports.ProxyService
312+
if suite.Name() == "Sherpa" {
313+
config := &sherpa.Configuration{
314+
Profile: constants.ConfigurationProxyProfileAuto,
315+
ResponseTimeout: 5 * time.Second,
316+
ReadTimeout: 5 * time.Second,
317+
StreamBufferSize: 8192,
318+
}
319+
proxy = suite.CreateProxy(discovery, selector, config, collector)
320+
} else {
321+
config := &olla.Configuration{
322+
Profile: constants.ConfigurationProxyProfileAuto,
323+
ResponseTimeout: 5 * time.Second,
324+
ReadTimeout: 5 * time.Second,
325+
StreamBufferSize: 8192,
326+
MaxIdleConns: 10,
327+
IdleConnTimeout: 90 * time.Second,
328+
MaxConnsPerHost: 5,
329+
}
330+
proxy = suite.CreateProxy(discovery, selector, config, collector)
331+
}
332+
333+
// Test 1: Auto profile with binary content and stream=true in context
334+
t.Run("context_stream_true_overrides_binary_detection", func(t *testing.T) {
335+
req, stats, rlog := createTestRequestWithBody("GET", "/api/test", "")
336+
// Add stream=true to context
337+
ctx := context.WithValue(req.Context(), "stream", true)
338+
req = req.WithContext(ctx)
339+
340+
w := &mockResponseWriter{
341+
ResponseRecorder: *httptest.NewRecorder(),
342+
}
343+
344+
err := proxy.ProxyRequest(req.Context(), w, req, stats, rlog)
345+
if err != nil {
346+
t.Fatalf("Proxy request failed: %v", err)
347+
}
348+
349+
// Should flush because context stream=true overrides binary detection
350+
if w.getFlushCount() == 0 {
351+
t.Error("Expected flushes when context stream=true for binary content in auto mode")
352+
}
353+
})
354+
355+
// Test 2: Auto profile with binary content and no context override
356+
t.Run("auto_profile_buffers_binary_without_context_override", func(t *testing.T) {
357+
req, stats, rlog := createTestRequestWithBody("GET", "/api/test", "")
358+
359+
w := &mockResponseWriter{
360+
ResponseRecorder: *httptest.NewRecorder(),
361+
}
362+
363+
err := proxy.ProxyRequest(req.Context(), w, req, stats, rlog)
364+
if err != nil {
365+
t.Fatalf("Proxy request failed: %v", err)
366+
}
367+
368+
// Should NOT flush binary content in auto mode
369+
if w.getFlushCount() > 0 {
370+
t.Error("Expected no flushes for binary content in auto mode without context override")
371+
}
372+
})
373+
})
374+
}
375+
}

0 commit comments

Comments
 (0)