Skip to content

fix: prevent bytes.Buffer capacity retention in streaming usage wrapper#95

Merged
SantiagoDePolonia merged 3 commits intomainfrom
fix/memory-leak-issue
Feb 25, 2026
Merged

fix: prevent bytes.Buffer capacity retention in streaming usage wrapper#95
SantiagoDePolonia merged 3 commits intomainfrom
fix/memory-leak-issue

Conversation

@SantiagoDePolonia
Copy link
Copy Markdown
Contributor

@SantiagoDePolonia SantiagoDePolonia commented Feb 25, 2026

Replace Reset()+Write() with right-sized buffer allocation when the event buffer grows disproportionately large (e.g. from a single large SSE event). This ensures the oversized backing array is released to GC instead of being pinned for the stream's lifetime.

Summary by CodeRabbit

  • Refactor
    • Improved internal memory management for event stream processing, enhancing efficiency.

Replace Reset()+Write() with right-sized buffer allocation when the
event buffer grows disproportionately large (e.g. from a single large
SSE event). This ensures the oversized backing array is released to GC
instead of being pinned for the stream's lifetime.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Feb 25, 2026

Warning

Rate limit exceeded

@SantiagoDePolonia has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 26 minutes and 1 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 772c6ca and cd6a871.

📒 Files selected for processing (1)
  • internal/usage/stream_wrapper.go
📝 Walkthrough

Walkthrough

The changes optimize buffer memory management in the SSE event processing logic by trimming oversized buffers when no complete SSE boundary exists and adding a capacity guard to prevent unbounded backing array growth during remainder appending.

Changes

Cohort / File(s) Summary
Buffer Management Optimization
internal/usage/stream_wrapper.go
Modified processCompleteEvents to replace oversized buffers with trimmed slices and added capacity guard checks to prevent capacity leaks when appending event remainders.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Poem

A rabbit hops through memory lanes,
Trimming buffers, breaking chains,
No more bloated backing arrays wide—
Efficient streams now run inside! 🐰✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: preventing bytes.Buffer capacity retention by replacing Reset()+Write() with right-sized buffer allocation in the streaming usage wrapper.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/memory-leak-issue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@SantiagoDePolonia
Copy link
Copy Markdown
Contributor Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Feb 25, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/usage/stream_wrapper.go (1)

109-123: ⚠️ Potential issue | 🟠 Major

Buffer replacement may retain oversized capacity through subslice backing array.

Line 119 uses bytes.NewBuffer(remainder), where remainder is a subslice of the original eventBuffer's backing array (derived from line 81: data[lastBoundary+2:]). Since bytes.NewBuffer() uses the slice directly without copying, the buffer can write into the large underlying backing array without reallocating. Additionally, when len(remainder) == 0, the block exits without releasing the oversized capacity after Reset().

💡 Proposed fix
 	// Keep only the remainder
 	w.eventBuffer.Reset()
 	if len(remainder) > 0 {
 		// Safety valve on remainder size — trim to keep newest bytes
 		if len(remainder) > maxEventBufferRemainder {
 			remainder = remainder[len(remainder)-maxEventBufferRemainder:]
 		}
 		// Prevent capacity leak: if the buffer grew much larger than needed
 		// (e.g. from a single large SSE event), replace it with a right-sized one
 		// instead of reusing the oversized backing array.
 		if w.eventBuffer.Cap() > maxEventBufferRemainder*2 {
-			w.eventBuffer = *bytes.NewBuffer(remainder)
+			rightSized := make([]byte, len(remainder))
+			copy(rightSized, remainder)
+			w.eventBuffer = *bytes.NewBuffer(rightSized)
 		} else {
 			w.eventBuffer.Write(remainder)
 		}
+	} else if w.eventBuffer.Cap() > maxEventBufferRemainder*2 {
+		// No remainder to keep; drop oversized backing array entirely.
+		w.eventBuffer = bytes.Buffer{}
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/usage/stream_wrapper.go` around lines 109 - 123, The replacement
logic for w.eventBuffer can retain the oversized backing array because
bytes.NewBuffer(remainder) reuses remainder's underlying slice and the
zero-remainder path never shrinks the buffer; update the block handling
remainder so you create a copy of remainder (e.g., copy into a new byte slice)
before calling bytes.NewBuffer to ensure a right-sized backing array, and also
add a branch for len(remainder)==0 that replaces w.eventBuffer with a newly
allocated empty buffer when w.eventBuffer.Cap() is excessively large (use
maxEventBufferRemainder*2 as the threshold) to prevent capacity leaks; adjust
logic around w.eventBuffer.Reset(), the use of remainder, and the conditional
that currently assigns bytes.NewBuffer(remainder).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@internal/usage/stream_wrapper.go`:
- Around line 109-123: The replacement logic for w.eventBuffer can retain the
oversized backing array because bytes.NewBuffer(remainder) reuses remainder's
underlying slice and the zero-remainder path never shrinks the buffer; update
the block handling remainder so you create a copy of remainder (e.g., copy into
a new byte slice) before calling bytes.NewBuffer to ensure a right-sized backing
array, and also add a branch for len(remainder)==0 that replaces w.eventBuffer
with a newly allocated empty buffer when w.eventBuffer.Cap() is excessively
large (use maxEventBufferRemainder*2 as the threshold) to prevent capacity
leaks; adjust logic around w.eventBuffer.Reset(), the use of remainder, and the
conditional that currently assigns bytes.NewBuffer(remainder).

ℹ️ Review info

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aaa5e24 and 772c6ca.

📒 Files selected for processing (1)
  • internal/usage/stream_wrapper.go

…er path

bytes.NewBuffer(remainder) reused the old backing array since remainder
is a sub-slice of the buffer's internal slice. Copy into a new slice
before replacing. Also move the capacity check outside the len>0 guard
so an oversized buffer is shrunk even when remainder is empty.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR attempts to fix memory retention issues in the streaming usage wrapper by replacing the Reset()+Write() pattern with right-sized buffer allocation when the event buffer grows too large. The goal is to release oversized backing arrays to the garbage collector instead of pinning them for the stream's lifetime.

Changes:

  • Modified buffer management in early-return path when no complete event is found (lines 69-75)
  • Updated buffer handling after processing complete events to conditionally create new buffers based on capacity (lines 108-121)
  • Added detailed comments explaining the capacity leak prevention strategy

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +114 to 121
if w.eventBuffer.Cap() > maxEventBufferRemainder*2 {
copied := make([]byte, len(remainder))
copy(copied, remainder)
w.eventBuffer = *bytes.NewBuffer(copied)
} else {
w.eventBuffer.Reset()
w.eventBuffer.Write(remainder)
}
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no test coverage for the new capacity-based buffer replacement logic introduced in this PR. The existing test TestStreamUsageWrapperLargeResponsesDone tests large events (>8KB), but doesn't verify that the buffer capacity is actually released or that the new code path is exercised.

Consider adding a test that:

  1. Sends a very large event that causes the buffer capacity to exceed maxEventBufferRemainder*2
  2. Verifies that subsequent reads don't retain the large capacity
  3. Checks that usage data is still correctly extracted after the buffer replacement

This would ensure the fix actually works as intended and prevent regressions.

Copilot uses AI. Check for mistakes.
Comment on lines +72 to +74
trimmed := make([]byte, maxEventBufferRemainder)
copy(trimmed, tail[start:])
w.eventBuffer = *bytes.NewBuffer(trimmed)
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dereferencing a bytes.Buffer pointer creates a copy of the buffer, not a replacement. Since eventBuffer is declared as a value type (line 25: eventBuffer bytes.Buffer), assigning *bytes.NewBuffer(trimmed) creates a new buffer and then copies it into the struct field. This defeats the entire purpose of the fix, as the original oversized backing array remains referenced by the struct until the new buffer's contents are copied over.

The correct approach is to use a pointer type for the field, or create the buffer directly without the pointer dereference. Since other code uses w.eventBuffer as a value, changing the field type would require broader changes. Instead, replace the buffer's contents using Reset() and Write(), or if avoiding capacity retention is critical, manually construct a buffer using the unexported fields (though this is not recommended).

However, the real issue is that this approach doesn't actually prevent capacity retention - you need to ensure the old buffer is unreferenced. The cleanest fix is to change eventBuffer to *bytes.Buffer in the struct definition.

Suggested change
trimmed := make([]byte, maxEventBufferRemainder)
copy(trimmed, tail[start:])
w.eventBuffer = *bytes.NewBuffer(trimmed)
trimmed := tail[start:]
w.eventBuffer.Reset()
_, _ = w.eventBuffer.Write(trimmed)

Copilot uses AI. Check for mistakes.
if w.eventBuffer.Cap() > maxEventBufferRemainder*2 {
copied := make([]byte, len(remainder))
copy(copied, remainder)
w.eventBuffer = *bytes.NewBuffer(copied)
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as line 74: dereferencing a bytes.Buffer pointer creates a copy rather than replacing the original. The assignment w.eventBuffer = *bytes.NewBuffer(copied) copies the newly created buffer into the struct field, which doesn't release the old buffer's backing array from the struct.

This means the fix doesn't achieve its intended goal of releasing oversized backing arrays to the garbage collector. The old buffer's data remains part of the struct until the copy completes, and even then, depending on how Go optimizes this, the capacity issue may persist.

Copilot uses AI. Check for mistakes.
Comment on lines +114 to 121
if w.eventBuffer.Cap() > maxEventBufferRemainder*2 {
copied := make([]byte, len(remainder))
copy(copied, remainder)
w.eventBuffer = *bytes.NewBuffer(copied)
} else {
w.eventBuffer.Reset()
w.eventBuffer.Write(remainder)
}
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition checks if capacity exceeds maxEventBufferRemainder*2, but the threshold seems arbitrary. Since maxEventBufferRemainder is 256KB, this triggers at 512KB capacity. However, when the buffer is in the 'else' branch (capacity <= 512KB), the Reset() + Write() pattern is used, which is exactly what the PR description says causes capacity retention problems.

This means buffers with capacity between the remainder length and 512KB will still exhibit the capacity retention issue that this PR aims to fix. Consider whether this threshold should be lower, or if the Reset()+Write() pattern should be avoided entirely when remainder is non-empty.

Suggested change
if w.eventBuffer.Cap() > maxEventBufferRemainder*2 {
copied := make([]byte, len(remainder))
copy(copied, remainder)
w.eventBuffer = *bytes.NewBuffer(copied)
} else {
w.eventBuffer.Reset()
w.eventBuffer.Write(remainder)
}
copied := make([]byte, len(remainder))
copy(copied, remainder)
w.eventBuffer = *bytes.NewBuffer(copied)

Copilot uses AI. Check for mistakes.
Comment on lines +72 to +74
trimmed := make([]byte, maxEventBufferRemainder)
copy(trimmed, tail[start:])
w.eventBuffer = *bytes.NewBuffer(trimmed)
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistency: The first code path (lines 69-75) always creates a new buffer when the buffer exceeds maxEventBufferRemainder, but the second path (lines 114-121) only creates a new buffer when capacity exceeds maxEventBufferRemainder*2. This means the two code paths handle the same scenario differently.

For consistency and to ensure the fix works uniformly, both paths should use the same logic. Consider applying the capacity check consistently in both locations, or document why they differ.

Suggested change
trimmed := make([]byte, maxEventBufferRemainder)
copy(trimmed, tail[start:])
w.eventBuffer = *bytes.NewBuffer(trimmed)
// If the underlying capacity has grown too large, allocate a new buffer,
// otherwise reuse the existing backing array to avoid unnecessary allocs.
if cap(tail) > maxEventBufferRemainder*2 {
trimmed := make([]byte, maxEventBufferRemainder)
copy(trimmed, tail[start:])
w.eventBuffer = *bytes.NewBuffer(trimmed)
} else {
// Move the last maxEventBufferRemainder bytes to the front in-place.
copy(tail[0:maxEventBufferRemainder], tail[start:])
w.eventBuffer.Reset()
w.eventBuffer.Write(tail[:maxEventBufferRemainder])
}

Copilot uses AI. Check for mistakes.
…rimming

Match the second safety-valve path's logic: only allocate a new buffer
when cap exceeds 2x maxEventBufferRemainder, otherwise reuse the backing
array via in-place copy to avoid unnecessary allocations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@SantiagoDePolonia SantiagoDePolonia merged commit b26abbe into main Feb 25, 2026
12 checks passed
@SantiagoDePolonia SantiagoDePolonia deleted the fix/memory-leak-issue branch March 22, 2026 14:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants