Skip to content

Commit 985d8eb

Browse files
committed
perf: avoid GC pressure and preallocate
1 parent c091490 commit 985d8eb

2 files changed

Lines changed: 193 additions & 2 deletions

File tree

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package olla
2+
3+
import (
4+
"bytes"
5+
"errors"
6+
"io"
7+
"net/http"
8+
"net/http/httptest"
9+
"testing"
10+
)
11+
12+
// BenchmarkStreamingLastChunk benchmarks the streaming last chunk capture with and without pre-allocated buffer.
13+
// This demonstrates the allocation reduction from using a pre-allocated buffer in streamState.
14+
func BenchmarkStreamingLastChunk(b *testing.B) {
15+
// Simulate a typical streaming response with EOF on the last chunk
16+
// Pre-create streamState to measure only the EOF handling allocation
17+
b.Run("PreAllocated_SmallChunk", func(b *testing.B) {
18+
buffer := make([]byte, 8192)
19+
data := []byte(`{"choices":[{"delta":{"content":"test"},"finish_reason":"stop"}]}`)
20+
w := httptest.NewRecorder()
21+
state := &streamState{} // Allocate once outside the loop
22+
23+
b.ResetTimer()
24+
b.ReportAllocs()
25+
26+
for i := 0; i < b.N; i++ {
27+
resp := &http.Response{
28+
Body: io.NopCloser(bytes.NewReader(data)),
29+
}
30+
31+
// Simulate EOF read (last chunk scenario)
32+
n, err := resp.Body.Read(buffer)
33+
if n > 0 {
34+
if errors.Is(err, io.EOF) {
35+
if n <= len(state.lastChunkBuf) {
36+
copy(state.lastChunkBuf[:], buffer[:n])
37+
state.lastChunk = state.lastChunkBuf[:n]
38+
} else {
39+
state.lastChunk = make([]byte, n)
40+
copy(state.lastChunk, buffer[:n])
41+
}
42+
}
43+
w.Write(buffer[:n])
44+
}
45+
}
46+
})
47+
48+
b.Run("PreAllocated_LargeChunk", func(b *testing.B) {
49+
buffer := make([]byte, 8192)
50+
// 4KB chunk - typical size
51+
data := make([]byte, 4096)
52+
for i := range data {
53+
data[i] = byte(i % 256)
54+
}
55+
w := httptest.NewRecorder()
56+
state := &streamState{} // Allocate once outside the loop
57+
58+
b.ResetTimer()
59+
b.ReportAllocs()
60+
61+
for i := 0; i < b.N; i++ {
62+
resp := &http.Response{
63+
Body: io.NopCloser(bytes.NewReader(data)),
64+
}
65+
66+
n, err := resp.Body.Read(buffer)
67+
if n > 0 {
68+
if errors.Is(err, io.EOF) {
69+
if n <= len(state.lastChunkBuf) {
70+
copy(state.lastChunkBuf[:], buffer[:n])
71+
state.lastChunk = state.lastChunkBuf[:n]
72+
} else {
73+
state.lastChunk = make([]byte, n)
74+
copy(state.lastChunk, buffer[:n])
75+
}
76+
}
77+
w.Write(buffer[:n])
78+
}
79+
}
80+
})
81+
82+
b.Run("PreAllocated_OversizedChunk", func(b *testing.B) {
83+
buffer := make([]byte, 16384) // Larger buffer for this test
84+
// 12KB chunk - exceeds pre-allocated buffer (rare case)
85+
data := make([]byte, 12288)
86+
for i := range data {
87+
data[i] = byte(i % 256)
88+
}
89+
w := httptest.NewRecorder()
90+
state := &streamState{} // Allocate once outside the loop
91+
92+
b.ResetTimer()
93+
b.ReportAllocs()
94+
95+
for i := 0; i < b.N; i++ {
96+
resp := &http.Response{
97+
Body: io.NopCloser(bytes.NewReader(data)),
98+
}
99+
100+
n, err := resp.Body.Read(buffer)
101+
if n > 0 {
102+
if errors.Is(err, io.EOF) {
103+
if n <= len(state.lastChunkBuf) {
104+
copy(state.lastChunkBuf[:], buffer[:n])
105+
state.lastChunk = state.lastChunkBuf[:n]
106+
} else {
107+
// Fallback allocation for oversized chunks
108+
state.lastChunk = make([]byte, n)
109+
copy(state.lastChunk, buffer[:n])
110+
}
111+
}
112+
w.Write(buffer[:n])
113+
}
114+
}
115+
})
116+
117+
// Comparison: Old allocation pattern (for reference)
118+
b.Run("OldAllocation_SmallChunk", func(b *testing.B) {
119+
buffer := make([]byte, 8192)
120+
data := []byte(`{"choices":[{"delta":{"content":"test"},"finish_reason":"stop"}]}`)
121+
w := httptest.NewRecorder()
122+
123+
// Create a mock state without pre-allocated buffer (simulating old code)
124+
type oldStreamState struct {
125+
lastChunk []byte
126+
}
127+
state := &oldStreamState{}
128+
129+
b.ResetTimer()
130+
b.ReportAllocs()
131+
132+
for i := 0; i < b.N; i++ {
133+
resp := &http.Response{
134+
Body: io.NopCloser(bytes.NewReader(data)),
135+
}
136+
137+
n, err := resp.Body.Read(buffer)
138+
if n > 0 {
139+
if errors.Is(err, io.EOF) {
140+
// Old pattern: always allocate
141+
state.lastChunk = make([]byte, n)
142+
copy(state.lastChunk, buffer[:n])
143+
}
144+
w.Write(buffer[:n])
145+
}
146+
}
147+
})
148+
149+
b.Run("OldAllocation_LargeChunk", func(b *testing.B) {
150+
buffer := make([]byte, 8192)
151+
data := make([]byte, 4096)
152+
for i := range data {
153+
data[i] = byte(i % 256)
154+
}
155+
w := httptest.NewRecorder()
156+
157+
type oldStreamState struct {
158+
lastChunk []byte
159+
}
160+
state := &oldStreamState{}
161+
162+
b.ResetTimer()
163+
b.ReportAllocs()
164+
165+
for i := 0; i < b.N; i++ {
166+
resp := &http.Response{
167+
Body: io.NopCloser(bytes.NewReader(data)),
168+
}
169+
170+
n, err := resp.Body.Read(buffer)
171+
if n > 0 {
172+
if errors.Is(err, io.EOF) {
173+
// Old pattern: always allocate
174+
state.lastChunk = make([]byte, n)
175+
copy(state.lastChunk, buffer[:n])
176+
}
177+
w.Write(buffer[:n])
178+
}
179+
}
180+
})
181+
}

internal/adapter/proxy/olla/streaming_helpers.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type streamState struct {
1818
lastChunk []byte
1919
totalBytes int
2020
bytesAfterDisconnect int
21+
lastChunkBuf [8192]byte
2122
clientDisconnected bool
2223
}
2324

@@ -80,9 +81,18 @@ func (s *Service) processStreamData(resp *http.Response, buffer []byte, state *s
8081
n, err := resp.Body.Read(buffer)
8182
if n > 0 {
8283
// Only keep last chunk when we hit EOF (for metrics extraction)
84+
// OLLA-221: large allocations per s tream adds GC Pressure over time
85+
// using a pre-allocated buffer to avoid heap allocation on hot path
8386
if errors.Is(err, io.EOF) {
84-
state.lastChunk = make([]byte, n)
85-
copy(state.lastChunk, buffer[:n])
87+
if n <= len(state.lastChunkBuf) {
88+
// most likely case is the chunk fits in pre-allocated buffer
89+
copy(state.lastChunkBuf[:], buffer[:n])
90+
state.lastChunk = state.lastChunkBuf[:n]
91+
} else {
92+
// rarer r case: chunk exceeds 8KB, fallback to allocation
93+
state.lastChunk = make([]byte, n)
94+
copy(state.lastChunk, buffer[:n])
95+
}
8696
}
8797

8898
// Handle data write

0 commit comments

Comments
 (0)