fix: prevent bytes.Buffer capacity retention in streaming usage wrapper#95
fix: prevent bytes.Buffer capacity retention in streaming usage wrapper#95SantiagoDePolonia merged 3 commits intomainfrom
Conversation
Replace Reset()+Write() with right-sized buffer allocation when the event buffer grows disproportionately large (e.g. from a single large SSE event). This ensures the oversized backing array is released to GC instead of being pinned for the stream's lifetime. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThe changes optimize buffer memory management in the SSE event processing logic by trimming oversized buffers when no complete SSE boundary exists and adding a capacity guard to prevent unbounded backing array growth during remainder appending. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/usage/stream_wrapper.go (1)
109-123:⚠️ Potential issue | 🟠 MajorBuffer replacement may retain oversized capacity through subslice backing array.
Line 119 uses
bytes.NewBuffer(remainder), whereremainderis a subslice of the originaleventBuffer's backing array (derived from line 81:data[lastBoundary+2:]). Sincebytes.NewBuffer()uses the slice directly without copying, the buffer can write into the large underlying backing array without reallocating. Additionally, whenlen(remainder) == 0, the block exits without releasing the oversized capacity afterReset().💡 Proposed fix
// Keep only the remainder w.eventBuffer.Reset() if len(remainder) > 0 { // Safety valve on remainder size — trim to keep newest bytes if len(remainder) > maxEventBufferRemainder { remainder = remainder[len(remainder)-maxEventBufferRemainder:] } // Prevent capacity leak: if the buffer grew much larger than needed // (e.g. from a single large SSE event), replace it with a right-sized one // instead of reusing the oversized backing array. if w.eventBuffer.Cap() > maxEventBufferRemainder*2 { - w.eventBuffer = *bytes.NewBuffer(remainder) + rightSized := make([]byte, len(remainder)) + copy(rightSized, remainder) + w.eventBuffer = *bytes.NewBuffer(rightSized) } else { w.eventBuffer.Write(remainder) } + } else if w.eventBuffer.Cap() > maxEventBufferRemainder*2 { + // No remainder to keep; drop oversized backing array entirely. + w.eventBuffer = bytes.Buffer{} }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/usage/stream_wrapper.go` around lines 109 - 123, The replacement logic for w.eventBuffer can retain the oversized backing array because bytes.NewBuffer(remainder) reuses remainder's underlying slice and the zero-remainder path never shrinks the buffer; update the block handling remainder so you create a copy of remainder (e.g., copy into a new byte slice) before calling bytes.NewBuffer to ensure a right-sized backing array, and also add a branch for len(remainder)==0 that replaces w.eventBuffer with a newly allocated empty buffer when w.eventBuffer.Cap() is excessively large (use maxEventBufferRemainder*2 as the threshold) to prevent capacity leaks; adjust logic around w.eventBuffer.Reset(), the use of remainder, and the conditional that currently assigns bytes.NewBuffer(remainder).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@internal/usage/stream_wrapper.go`:
- Around line 109-123: The replacement logic for w.eventBuffer can retain the
oversized backing array because bytes.NewBuffer(remainder) reuses remainder's
underlying slice and the zero-remainder path never shrinks the buffer; update
the block handling remainder so you create a copy of remainder (e.g., copy into
a new byte slice) before calling bytes.NewBuffer to ensure a right-sized backing
array, and also add a branch for len(remainder)==0 that replaces w.eventBuffer
with a newly allocated empty buffer when w.eventBuffer.Cap() is excessively
large (use maxEventBufferRemainder*2 as the threshold) to prevent capacity
leaks; adjust logic around w.eventBuffer.Reset(), the use of remainder, and the
conditional that currently assigns bytes.NewBuffer(remainder).
…er path bytes.NewBuffer(remainder) reused the old backing array since remainder is a sub-slice of the buffer's internal slice. Copy into a new slice before replacing. Also move the capacity check outside the len>0 guard so an oversized buffer is shrunk even when remainder is empty. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR attempts to fix memory retention issues in the streaming usage wrapper by replacing the Reset()+Write() pattern with right-sized buffer allocation when the event buffer grows too large. The goal is to release oversized backing arrays to the garbage collector instead of pinning them for the stream's lifetime.
Changes:
- Modified buffer management in early-return path when no complete event is found (lines 69-75)
- Updated buffer handling after processing complete events to conditionally create new buffers based on capacity (lines 108-121)
- Added detailed comments explaining the capacity leak prevention strategy
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if w.eventBuffer.Cap() > maxEventBufferRemainder*2 { | ||
| copied := make([]byte, len(remainder)) | ||
| copy(copied, remainder) | ||
| w.eventBuffer = *bytes.NewBuffer(copied) | ||
| } else { | ||
| w.eventBuffer.Reset() | ||
| w.eventBuffer.Write(remainder) | ||
| } |
There was a problem hiding this comment.
There's no test coverage for the new capacity-based buffer replacement logic introduced in this PR. The existing test TestStreamUsageWrapperLargeResponsesDone tests large events (>8KB), but doesn't verify that the buffer capacity is actually released or that the new code path is exercised.
Consider adding a test that:
- Sends a very large event that causes the buffer capacity to exceed maxEventBufferRemainder*2
- Verifies that subsequent reads don't retain the large capacity
- Checks that usage data is still correctly extracted after the buffer replacement
This would ensure the fix actually works as intended and prevent regressions.
internal/usage/stream_wrapper.go
Outdated
| trimmed := make([]byte, maxEventBufferRemainder) | ||
| copy(trimmed, tail[start:]) | ||
| w.eventBuffer = *bytes.NewBuffer(trimmed) |
There was a problem hiding this comment.
Dereferencing a bytes.Buffer pointer creates a copy of the buffer, not a replacement. Since eventBuffer is declared as a value type (line 25: eventBuffer bytes.Buffer), assigning *bytes.NewBuffer(trimmed) creates a new buffer and then copies it into the struct field. This defeats the entire purpose of the fix, as the original oversized backing array remains referenced by the struct until the new buffer's contents are copied over.
The correct approach is to use a pointer type for the field, or create the buffer directly without the pointer dereference. Since other code uses w.eventBuffer as a value, changing the field type would require broader changes. Instead, replace the buffer's contents using Reset() and Write(), or if avoiding capacity retention is critical, manually construct a buffer using the unexported fields (though this is not recommended).
However, the real issue is that this approach doesn't actually prevent capacity retention - you need to ensure the old buffer is unreferenced. The cleanest fix is to change eventBuffer to *bytes.Buffer in the struct definition.
| trimmed := make([]byte, maxEventBufferRemainder) | |
| copy(trimmed, tail[start:]) | |
| w.eventBuffer = *bytes.NewBuffer(trimmed) | |
| trimmed := tail[start:] | |
| w.eventBuffer.Reset() | |
| _, _ = w.eventBuffer.Write(trimmed) |
| if w.eventBuffer.Cap() > maxEventBufferRemainder*2 { | ||
| copied := make([]byte, len(remainder)) | ||
| copy(copied, remainder) | ||
| w.eventBuffer = *bytes.NewBuffer(copied) |
There was a problem hiding this comment.
Same issue as line 74: dereferencing a bytes.Buffer pointer creates a copy rather than replacing the original. The assignment w.eventBuffer = *bytes.NewBuffer(copied) copies the newly created buffer into the struct field, which doesn't release the old buffer's backing array from the struct.
This means the fix doesn't achieve its intended goal of releasing oversized backing arrays to the garbage collector. The old buffer's data remains part of the struct until the copy completes, and even then, depending on how Go optimizes this, the capacity issue may persist.
| if w.eventBuffer.Cap() > maxEventBufferRemainder*2 { | ||
| copied := make([]byte, len(remainder)) | ||
| copy(copied, remainder) | ||
| w.eventBuffer = *bytes.NewBuffer(copied) | ||
| } else { | ||
| w.eventBuffer.Reset() | ||
| w.eventBuffer.Write(remainder) | ||
| } |
There was a problem hiding this comment.
The condition checks if capacity exceeds maxEventBufferRemainder*2, but the threshold seems arbitrary. Since maxEventBufferRemainder is 256KB, this triggers at 512KB capacity. However, when the buffer is in the 'else' branch (capacity <= 512KB), the Reset() + Write() pattern is used, which is exactly what the PR description says causes capacity retention problems.
This means buffers with capacity between the remainder length and 512KB will still exhibit the capacity retention issue that this PR aims to fix. Consider whether this threshold should be lower, or if the Reset()+Write() pattern should be avoided entirely when remainder is non-empty.
| if w.eventBuffer.Cap() > maxEventBufferRemainder*2 { | |
| copied := make([]byte, len(remainder)) | |
| copy(copied, remainder) | |
| w.eventBuffer = *bytes.NewBuffer(copied) | |
| } else { | |
| w.eventBuffer.Reset() | |
| w.eventBuffer.Write(remainder) | |
| } | |
| copied := make([]byte, len(remainder)) | |
| copy(copied, remainder) | |
| w.eventBuffer = *bytes.NewBuffer(copied) |
internal/usage/stream_wrapper.go
Outdated
| trimmed := make([]byte, maxEventBufferRemainder) | ||
| copy(trimmed, tail[start:]) | ||
| w.eventBuffer = *bytes.NewBuffer(trimmed) |
There was a problem hiding this comment.
Inconsistency: The first code path (lines 69-75) always creates a new buffer when the buffer exceeds maxEventBufferRemainder, but the second path (lines 114-121) only creates a new buffer when capacity exceeds maxEventBufferRemainder*2. This means the two code paths handle the same scenario differently.
For consistency and to ensure the fix works uniformly, both paths should use the same logic. Consider applying the capacity check consistently in both locations, or document why they differ.
| trimmed := make([]byte, maxEventBufferRemainder) | |
| copy(trimmed, tail[start:]) | |
| w.eventBuffer = *bytes.NewBuffer(trimmed) | |
| // If the underlying capacity has grown too large, allocate a new buffer, | |
| // otherwise reuse the existing backing array to avoid unnecessary allocs. | |
| if cap(tail) > maxEventBufferRemainder*2 { | |
| trimmed := make([]byte, maxEventBufferRemainder) | |
| copy(trimmed, tail[start:]) | |
| w.eventBuffer = *bytes.NewBuffer(trimmed) | |
| } else { | |
| // Move the last maxEventBufferRemainder bytes to the front in-place. | |
| copy(tail[0:maxEventBufferRemainder], tail[start:]) | |
| w.eventBuffer.Reset() | |
| w.eventBuffer.Write(tail[:maxEventBufferRemainder]) | |
| } |
…rimming Match the second safety-valve path's logic: only allocate a new buffer when cap exceeds 2x maxEventBufferRemainder, otherwise reuse the backing array via in-place copy to avoid unnecessary allocations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace Reset()+Write() with right-sized buffer allocation when the event buffer grows disproportionately large (e.g. from a single large SSE event). This ensures the oversized backing array is released to GC instead of being pinned for the stream's lifetime.
Summary by CodeRabbit