-
Notifications
You must be signed in to change notification settings - Fork 198
Proxyrunner memory grows monotonically under streamable-http session churn #4062
Description
Summary
The proxyrunner (thv-proxyrunner) leaks memory when handling repeated streamable-http (and perhaps SSE) session lifecycles. Each session is explicitly created via initialize and torn down via DELETE, but memory is not fully reclaimed. Over sustained usage this leads to OOMKill.
Reproduction
Deploy a yardstick MCPServer with streamable-http transport and a 128Mi memory limit on the proxyrunner:
apiVersion: toolhive.stacklok.dev/v1alpha1
kind: MCPServer
metadata:
name: yardstick-memleak-test
spec:
image: ghcr.io/stackloklabs/yardstick/yardstick-server:1.1.1
transport: streamable-http
proxyPort: 8080
mcpPort: 8080
env:
- name: TRANSPORT
value: streamable-http
resources:
limits:
memory: "128Mi"
permissionProfile:
type: builtin
name: networkThen run sequential session lifecycles against the proxy (initialize → notifications/initialized → tools/list → tools/call x2 → DELETE). Each session is fully deleted before the next begins.
Observed behavior:
| Cumulative sessions | Memory (cgroup) | Notes |
|---|---|---|
| 0 | ~40 MiB | Baseline after startup |
| 30,000 | 103 MiB (80%) | After first 30k batch |
| ~60,000 | >128 MiB | OOMKilled (RestartCount=1) |
Depending on the root cause, more thorough testing SSE connections should also be done to confirm whether the problem exists there as well.
Expected behavior: Memory should stabilize after Go's GC reclaims closed sessions. With explicit DELETE for every session, memory should plateau rather than grow indefinitely.
Environment
- ToolHive operator running in kind cluster
- Proxyrunner image:
thv-proxyrunnerbuilt from current main - Yardstick server:
ghcr.io/stackloklabs/yardstick/yardstick-server:1.1.1 - Go runtime (proxyrunner is a single Go binary, no cgo)
Likely area
The streamable-http proxy session handling in pkg/transport/proxy/streamable/ — something is retaining references to closed sessions (or their buffers/channels) after the DELETE handler runs, preventing GC from collecting them.
Recommended approach
The E2E test below confirms the leak exists but is too coarse to pinpoint the root cause. The fix should be driven by a more targeted investigation:
- Go benchmark test of the streamable proxy session create/delete path (e.g.
b.ReportAllocs()) to measure per-session allocation delta go tool pprofheap profile of the proxyrunner under load to identify which objects are accumulatinggoleakor similar leak checker in a unit test to catch goroutines that survive session teardown
Once the leaking reference is identified, the fix can be validated with the benchmark showing zero net allocation growth per cycle.
E2E reproducer test
This Ginkgo test can be dropped into test/e2e/thv-operator/virtualmcp/proxyrunner_memory_test.go. It supports two modes: auto-deploy via the operator, or pointing at a pre-deployed server via MEMLEAK_TEST_URL env var.
proxyrunner_memory_test.go (click to expand)
// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc.
// SPDX-License-Identifier: Apache-2.0
package virtualmcp
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
"github.com/stacklok/toolhive/test/e2e/images"
)
var _ = Describe("Proxyrunner memory under session churn", Ordered, func() {
var (
testNamespace = "default"
mcpServerName = "yardstick-memleak-test"
timeout = 3 * time.Minute
pollingInterval = 1 * time.Second
baseURL string
deployed bool
)
BeforeAll(func() {
if u := os.Getenv("MEMLEAK_TEST_URL"); u != "" {
baseURL = u
GinkgoWriter.Printf("Using pre-deployed server at %s\n", baseURL)
return
}
deployed = true
By("Creating MCPServer with tight memory limits")
mcpServer := &mcpv1alpha1.MCPServer{
ObjectMeta: metav1.ObjectMeta{
Name: mcpServerName,
Namespace: testNamespace,
},
Spec: mcpv1alpha1.MCPServerSpec{
Image: images.YardstickServerImage,
Transport: "streamable-http",
ProxyPort: 8080,
McpPort: 8080,
Env: []mcpv1alpha1.EnvVar{
{Name: "TRANSPORT", Value: "streamable-http"},
},
Resources: mcpv1alpha1.ResourceRequirements{
Limits: mcpv1alpha1.ResourceList{
CPU: "200m",
Memory: "128Mi",
},
Requests: mcpv1alpha1.ResourceList{
CPU: "50m",
Memory: "64Mi",
},
},
PermissionProfile: &mcpv1alpha1.PermissionProfileRef{
Type: mcpv1alpha1.PermissionProfileTypeBuiltin,
Name: "network",
},
},
}
Expect(k8sClient.Create(ctx, mcpServer)).To(Succeed())
By("Waiting for MCPServer to be running")
Eventually(func() error {
server := &mcpv1alpha1.MCPServer{}
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(mcpServer), server); err != nil {
return fmt.Errorf("failed to get MCPServer: %w", err)
}
if server.Status.Phase == mcpv1alpha1.MCPServerPhaseRunning {
return nil
}
return fmt.Errorf("MCPServer not ready yet, phase: %s", server.Status.Phase)
}, timeout, pollingInterval).Should(Succeed())
By("Patching the operator-created proxy service to NodePort")
proxyServiceName := fmt.Sprintf("mcp-%s-proxy", mcpServerName)
var nodePort int32
Eventually(func() error {
svc := &corev1.Service{}
if err := k8sClient.Get(ctx, client.ObjectKey{
Name: proxyServiceName, Namespace: testNamespace,
}, svc); err != nil {
return fmt.Errorf("proxy service not found yet: %w", err)
}
if svc.Spec.Type != corev1.ServiceTypeNodePort {
svc.Spec.Type = corev1.ServiceTypeNodePort
if err := k8sClient.Update(ctx, svc); err != nil {
return fmt.Errorf("failed to patch service to NodePort: %w", err)
}
}
if len(svc.Spec.Ports) == 0 || svc.Spec.Ports[0].NodePort == 0 {
return fmt.Errorf("nodePort not assigned yet")
}
nodePort = svc.Spec.Ports[0].NodePort
if err := checkPortAccessible(nodePort, 1*time.Second); err != nil {
return fmt.Errorf("nodePort %d not accessible: %w", nodePort, err)
}
if err := checkHTTPHealthReady(nodePort, 2*time.Second); err != nil {
return fmt.Errorf("HTTP server not ready on port %d: %w", nodePort, err)
}
return nil
}, timeout, pollingInterval).Should(Succeed())
baseURL = fmt.Sprintf("http://localhost:%d/mcp", nodePort)
})
AfterAll(func() {
if !deployed {
return
}
By("Cleaning up MCPServer")
_ = k8sClient.Delete(ctx, &mcpv1alpha1.MCPServer{
ObjectMeta: metav1.ObjectMeta{Name: mcpServerName, Namespace: testNamespace},
})
})
It("does not OOM after 30000 session lifecycles", func() {
const iterations = 30000
httpClient := &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
DisableKeepAlives: true,
},
}
By("Warming up: waiting for proxy to be ready for MCP traffic")
Eventually(func() error {
return tryMCPInitialize(httpClient, baseURL)
}, 60*time.Second, 1*time.Second).Should(Succeed(),
"proxy should become ready for MCP initialize requests")
By("Running session lifecycles")
var loopErr error
completed := 0
for i := range iterations {
if err := doSessionLifecycle(httpClient, baseURL, i); err != nil {
loopErr = fmt.Errorf("session lifecycle failed at iteration %d: %w", i, err)
break
}
completed++
if (i+1)%100 == 0 {
GinkgoWriter.Printf(" completed %d/%d session lifecycles\n", i+1, iterations)
}
}
By("Verifying pod is still running and was not OOMKilled")
assertProxyrunnerHealthy(testNamespace, mcpServerName)
if loopErr != nil {
Fail(fmt.Sprintf("session loop stopped after %d/%d iterations: %v",
completed, iterations, loopErr))
}
})
})
func doSessionLifecycle(httpClient *http.Client, baseURL string, iteration int) error {
sessionID, err := doMCPInitialize(httpClient, baseURL, iteration)
if err != nil {
return fmt.Errorf("initialize: %w", err)
}
if err := doMCPNotifyInitialized(httpClient, baseURL, sessionID); err != nil {
return fmt.Errorf("notify initialized: %w", err)
}
if err := doMCPToolsList(httpClient, baseURL, sessionID); err != nil {
return fmt.Errorf("tools/list: %w", err)
}
if err := doMCPToolsCallEcho(httpClient, baseURL, sessionID, "ping1"); err != nil {
return fmt.Errorf("tools/call echo ping1: %w", err)
}
if err := doMCPToolsCallEcho(httpClient, baseURL, sessionID, "ping2"); err != nil {
return fmt.Errorf("tools/call echo ping2: %w", err)
}
if err := doMCPDeleteSession(httpClient, baseURL, sessionID); err != nil {
return fmt.Errorf("delete session: %w", err)
}
return nil
}
func assertProxyrunnerHealthy(namespace, mcpServerName string) {
podList := &corev1.PodList{}
Expect(k8sClient.List(ctx, podList,
client.InNamespace(namespace),
client.MatchingLabels{
"app.kubernetes.io/name": "mcpserver",
"app.kubernetes.io/instance": mcpServerName,
},
)).To(Succeed())
Expect(podList.Items).NotTo(BeEmpty(), "expected at least one pod for MCPServer")
for _, pod := range podList.Items {
Expect(pod.Status.Phase).To(Equal(corev1.PodRunning),
"pod %s should be Running, got %s", pod.Name, pod.Status.Phase)
for _, cs := range pod.Status.ContainerStatuses {
Expect(cs.RestartCount).To(Equal(int32(0)),
"container %s in pod %s should have 0 restarts, got %d",
cs.Name, pod.Name, cs.RestartCount)
if cs.LastTerminationState.Terminated != nil {
Expect(cs.LastTerminationState.Terminated.Reason).NotTo(Equal("OOMKilled"),
"container %s in pod %s was OOMKilled", cs.Name, pod.Name)
}
}
}
}
func ssePost(httpClient *http.Client, url, sessionID string, body []byte) (int, string, string, error) {
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return 0, "", "", fmt.Errorf("creating request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json, text/event-stream")
if sessionID != "" {
req.Header.Set("Mcp-Session-Id", sessionID)
}
resp, err := httpClient.Do(req)
if err != nil {
return 0, "", "", fmt.Errorf("executing request: %w", err)
}
sessHdr := resp.Header.Get("Mcp-Session-Id")
status := resp.StatusCode
var dataLine string
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "data: ") {
dataLine = strings.TrimPrefix(line, "data: ")
break
}
}
_ = resp.Body.Close()
return status, sessHdr, dataLine, nil
}
func mcpRequestBody(method string, id int, params map[string]any) ([]byte, error) {
body := map[string]any{"jsonrpc": "2.0", "method": method}
if id > 0 {
body["id"] = id
}
if params != nil {
body["params"] = params
}
return json.Marshal(body)
}
func tryMCPInitialize(httpClient *http.Client, baseURL string) error {
body, err := json.Marshal(map[string]any{
"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": map[string]any{
"protocolVersion": "2025-03-26",
"capabilities": map[string]any{},
"clientInfo": map[string]any{"name": "warmup-probe", "version": "1.0.0"},
},
})
if err != nil {
return err
}
req, err := http.NewRequest(http.MethodPost, baseURL, bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json, text/event-stream")
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("POST initialize: %w", err)
}
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
if strings.HasPrefix(scanner.Text(), "data: ") {
break
}
}
_ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("initialize returned %d, want 200", resp.StatusCode)
}
sessionID := resp.Header.Get("Mcp-Session-Id")
if sessionID == "" {
return fmt.Errorf("no Mcp-Session-Id in response")
}
delReq, err := http.NewRequest(http.MethodDelete, baseURL, nil)
if err != nil {
return err
}
delReq.Header.Set("Mcp-Session-Id", sessionID)
delResp, err := httpClient.Do(delReq)
if err != nil {
return err
}
_ = delResp.Body.Close()
return nil
}
func doMCPInitialize(httpClient *http.Client, baseURL string, iteration int) (string, error) {
body, err := mcpRequestBody("initialize", 1, map[string]any{
"protocolVersion": "2025-03-26",
"capabilities": map[string]any{},
"clientInfo": map[string]any{"name": fmt.Sprintf("memleak-test-%d", iteration), "version": "1.0.0"},
})
if err != nil {
return "", err
}
status, sessionID, _, err := ssePost(httpClient, baseURL, "", body)
if err != nil {
return "", err
}
if status != http.StatusOK {
return "", fmt.Errorf("initialize returned %d, want 200", status)
}
if sessionID == "" {
return "", fmt.Errorf("no Mcp-Session-Id in response")
}
return sessionID, nil
}
func doMCPNotifyInitialized(httpClient *http.Client, baseURL, sessionID string) error {
body, err := mcpRequestBody("notifications/initialized", 0, nil)
if err != nil {
return err
}
status, _, _, err := ssePost(httpClient, baseURL, sessionID, body)
if err != nil {
return err
}
if status < 200 || status >= 300 {
return fmt.Errorf("notifications/initialized returned %d", status)
}
return nil
}
func doMCPToolsList(httpClient *http.Client, baseURL, sessionID string) error {
body, err := mcpRequestBody("tools/list", 2, map[string]any{})
if err != nil {
return err
}
status, _, _, err := ssePost(httpClient, baseURL, sessionID, body)
if err != nil {
return err
}
if status != http.StatusOK {
return fmt.Errorf("tools/list returned %d, want 200", status)
}
return nil
}
func doMCPToolsCallEcho(httpClient *http.Client, baseURL, sessionID, input string) error {
body, err := mcpRequestBody("tools/call", 3, map[string]any{
"name": "echo",
"arguments": map[string]any{"input": input},
})
if err != nil {
return err
}
status, _, _, err := ssePost(httpClient, baseURL, sessionID, body)
if err != nil {
return err
}
if status != http.StatusOK {
return fmt.Errorf("tools/call echo returned %d, want 200", status)
}
return nil
}
func doMCPDeleteSession(httpClient *http.Client, baseURL, sessionID string) error {
req, err := http.NewRequest(http.MethodDelete, baseURL, nil)
if err != nil {
return err
}
req.Header.Set("Mcp-Session-Id", sessionID)
resp, err := httpClient.Do(req)
if err != nil {
return err
}
_ = resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("DELETE returned %d", resp.StatusCode)
}
return nil
}