Skip to content

Proxyrunner memory grows monotonically under streamable-http session churn #4062

@jerm-dro

Description

@jerm-dro

Summary

The proxyrunner (thv-proxyrunner) leaks memory when handling repeated streamable-http (and perhaps SSE) session lifecycles. Each session is explicitly created via initialize and torn down via DELETE, but memory is not fully reclaimed. Over sustained usage this leads to OOMKill.

Reproduction

Deploy a yardstick MCPServer with streamable-http transport and a 128Mi memory limit on the proxyrunner:

apiVersion: toolhive.stacklok.dev/v1alpha1
kind: MCPServer
metadata:
  name: yardstick-memleak-test
spec:
  image: ghcr.io/stackloklabs/yardstick/yardstick-server:1.1.1
  transport: streamable-http
  proxyPort: 8080
  mcpPort: 8080
  env:
    - name: TRANSPORT
      value: streamable-http
  resources:
    limits:
      memory: "128Mi"
  permissionProfile:
    type: builtin
    name: network

Then run sequential session lifecycles against the proxy (initialize → notifications/initialized → tools/list → tools/call x2 → DELETE). Each session is fully deleted before the next begins.

Observed behavior:

Cumulative sessions Memory (cgroup) Notes
0 ~40 MiB Baseline after startup
30,000 103 MiB (80%) After first 30k batch
~60,000 >128 MiB OOMKilled (RestartCount=1)

Depending on the root cause, more thorough testing SSE connections should also be done to confirm whether the problem exists there as well.

Expected behavior: Memory should stabilize after Go's GC reclaims closed sessions. With explicit DELETE for every session, memory should plateau rather than grow indefinitely.

Environment

  • ToolHive operator running in kind cluster
  • Proxyrunner image: thv-proxyrunner built from current main
  • Yardstick server: ghcr.io/stackloklabs/yardstick/yardstick-server:1.1.1
  • Go runtime (proxyrunner is a single Go binary, no cgo)

Likely area

The streamable-http proxy session handling in pkg/transport/proxy/streamable/ — something is retaining references to closed sessions (or their buffers/channels) after the DELETE handler runs, preventing GC from collecting them.

Recommended approach

The E2E test below confirms the leak exists but is too coarse to pinpoint the root cause. The fix should be driven by a more targeted investigation:

  • Go benchmark test of the streamable proxy session create/delete path (e.g. b.ReportAllocs()) to measure per-session allocation delta
  • go tool pprof heap profile of the proxyrunner under load to identify which objects are accumulating
  • goleak or similar leak checker in a unit test to catch goroutines that survive session teardown

Once the leaking reference is identified, the fix can be validated with the benchmark showing zero net allocation growth per cycle.

E2E reproducer test

This Ginkgo test can be dropped into test/e2e/thv-operator/virtualmcp/proxyrunner_memory_test.go. It supports two modes: auto-deploy via the operator, or pointing at a pre-deployed server via MEMLEAK_TEST_URL env var.

proxyrunner_memory_test.go (click to expand)
// SPDX-FileCopyrightText: Copyright 2025 Stacklok, Inc.
// SPDX-License-Identifier: Apache-2.0

package virtualmcp

import (
	"bufio"
	"bytes"
	"encoding/json"
	"fmt"
	"net/http"
	"os"
	"strings"
	"time"

	. "github.com/onsi/ginkgo/v2"
	. "github.com/onsi/gomega"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"sigs.k8s.io/controller-runtime/pkg/client"

	mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
	"github.com/stacklok/toolhive/test/e2e/images"
)

var _ = Describe("Proxyrunner memory under session churn", Ordered, func() {
	var (
		testNamespace   = "default"
		mcpServerName   = "yardstick-memleak-test"
		timeout         = 3 * time.Minute
		pollingInterval = 1 * time.Second
		baseURL         string
		deployed        bool
	)

	BeforeAll(func() {
		if u := os.Getenv("MEMLEAK_TEST_URL"); u != "" {
			baseURL = u
			GinkgoWriter.Printf("Using pre-deployed server at %s\n", baseURL)
			return
		}

		deployed = true

		By("Creating MCPServer with tight memory limits")
		mcpServer := &mcpv1alpha1.MCPServer{
			ObjectMeta: metav1.ObjectMeta{
				Name:      mcpServerName,
				Namespace: testNamespace,
			},
			Spec: mcpv1alpha1.MCPServerSpec{
				Image:     images.YardstickServerImage,
				Transport: "streamable-http",
				ProxyPort: 8080,
				McpPort:   8080,
				Env: []mcpv1alpha1.EnvVar{
					{Name: "TRANSPORT", Value: "streamable-http"},
				},
				Resources: mcpv1alpha1.ResourceRequirements{
					Limits: mcpv1alpha1.ResourceList{
						CPU:    "200m",
						Memory: "128Mi",
					},
					Requests: mcpv1alpha1.ResourceList{
						CPU:    "50m",
						Memory: "64Mi",
					},
				},
				PermissionProfile: &mcpv1alpha1.PermissionProfileRef{
					Type: mcpv1alpha1.PermissionProfileTypeBuiltin,
					Name: "network",
				},
			},
		}
		Expect(k8sClient.Create(ctx, mcpServer)).To(Succeed())

		By("Waiting for MCPServer to be running")
		Eventually(func() error {
			server := &mcpv1alpha1.MCPServer{}
			if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(mcpServer), server); err != nil {
				return fmt.Errorf("failed to get MCPServer: %w", err)
			}
			if server.Status.Phase == mcpv1alpha1.MCPServerPhaseRunning {
				return nil
			}
			return fmt.Errorf("MCPServer not ready yet, phase: %s", server.Status.Phase)
		}, timeout, pollingInterval).Should(Succeed())

		By("Patching the operator-created proxy service to NodePort")
		proxyServiceName := fmt.Sprintf("mcp-%s-proxy", mcpServerName)
		var nodePort int32
		Eventually(func() error {
			svc := &corev1.Service{}
			if err := k8sClient.Get(ctx, client.ObjectKey{
				Name: proxyServiceName, Namespace: testNamespace,
			}, svc); err != nil {
				return fmt.Errorf("proxy service not found yet: %w", err)
			}
			if svc.Spec.Type != corev1.ServiceTypeNodePort {
				svc.Spec.Type = corev1.ServiceTypeNodePort
				if err := k8sClient.Update(ctx, svc); err != nil {
					return fmt.Errorf("failed to patch service to NodePort: %w", err)
				}
			}
			if len(svc.Spec.Ports) == 0 || svc.Spec.Ports[0].NodePort == 0 {
				return fmt.Errorf("nodePort not assigned yet")
			}
			nodePort = svc.Spec.Ports[0].NodePort

			if err := checkPortAccessible(nodePort, 1*time.Second); err != nil {
				return fmt.Errorf("nodePort %d not accessible: %w", nodePort, err)
			}
			if err := checkHTTPHealthReady(nodePort, 2*time.Second); err != nil {
				return fmt.Errorf("HTTP server not ready on port %d: %w", nodePort, err)
			}
			return nil
		}, timeout, pollingInterval).Should(Succeed())

		baseURL = fmt.Sprintf("http://localhost:%d/mcp", nodePort)
	})

	AfterAll(func() {
		if !deployed {
			return
		}
		By("Cleaning up MCPServer")
		_ = k8sClient.Delete(ctx, &mcpv1alpha1.MCPServer{
			ObjectMeta: metav1.ObjectMeta{Name: mcpServerName, Namespace: testNamespace},
		})
	})

	It("does not OOM after 30000 session lifecycles", func() {
		const iterations = 30000
		httpClient := &http.Client{
			Timeout: 30 * time.Second,
			Transport: &http.Transport{
				DisableKeepAlives: true,
			},
		}

		By("Warming up: waiting for proxy to be ready for MCP traffic")
		Eventually(func() error {
			return tryMCPInitialize(httpClient, baseURL)
		}, 60*time.Second, 1*time.Second).Should(Succeed(),
			"proxy should become ready for MCP initialize requests")

		By("Running session lifecycles")
		var loopErr error
		completed := 0
		for i := range iterations {
			if err := doSessionLifecycle(httpClient, baseURL, i); err != nil {
				loopErr = fmt.Errorf("session lifecycle failed at iteration %d: %w", i, err)
				break
			}
			completed++
			if (i+1)%100 == 0 {
				GinkgoWriter.Printf("  completed %d/%d session lifecycles\n", i+1, iterations)
			}
		}

		By("Verifying pod is still running and was not OOMKilled")
		assertProxyrunnerHealthy(testNamespace, mcpServerName)

		if loopErr != nil {
			Fail(fmt.Sprintf("session loop stopped after %d/%d iterations: %v",
				completed, iterations, loopErr))
		}
	})
})

func doSessionLifecycle(httpClient *http.Client, baseURL string, iteration int) error {
	sessionID, err := doMCPInitialize(httpClient, baseURL, iteration)
	if err != nil {
		return fmt.Errorf("initialize: %w", err)
	}
	if err := doMCPNotifyInitialized(httpClient, baseURL, sessionID); err != nil {
		return fmt.Errorf("notify initialized: %w", err)
	}
	if err := doMCPToolsList(httpClient, baseURL, sessionID); err != nil {
		return fmt.Errorf("tools/list: %w", err)
	}
	if err := doMCPToolsCallEcho(httpClient, baseURL, sessionID, "ping1"); err != nil {
		return fmt.Errorf("tools/call echo ping1: %w", err)
	}
	if err := doMCPToolsCallEcho(httpClient, baseURL, sessionID, "ping2"); err != nil {
		return fmt.Errorf("tools/call echo ping2: %w", err)
	}
	if err := doMCPDeleteSession(httpClient, baseURL, sessionID); err != nil {
		return fmt.Errorf("delete session: %w", err)
	}
	return nil
}

func assertProxyrunnerHealthy(namespace, mcpServerName string) {
	podList := &corev1.PodList{}
	Expect(k8sClient.List(ctx, podList,
		client.InNamespace(namespace),
		client.MatchingLabels{
			"app.kubernetes.io/name":     "mcpserver",
			"app.kubernetes.io/instance": mcpServerName,
		},
	)).To(Succeed())
	Expect(podList.Items).NotTo(BeEmpty(), "expected at least one pod for MCPServer")

	for _, pod := range podList.Items {
		Expect(pod.Status.Phase).To(Equal(corev1.PodRunning),
			"pod %s should be Running, got %s", pod.Name, pod.Status.Phase)
		for _, cs := range pod.Status.ContainerStatuses {
			Expect(cs.RestartCount).To(Equal(int32(0)),
				"container %s in pod %s should have 0 restarts, got %d",
				cs.Name, pod.Name, cs.RestartCount)
			if cs.LastTerminationState.Terminated != nil {
				Expect(cs.LastTerminationState.Terminated.Reason).NotTo(Equal("OOMKilled"),
					"container %s in pod %s was OOMKilled", cs.Name, pod.Name)
			}
		}
	}
}

func ssePost(httpClient *http.Client, url, sessionID string, body []byte) (int, string, string, error) {
	req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(body))
	if err != nil {
		return 0, "", "", fmt.Errorf("creating request: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Accept", "application/json, text/event-stream")
	if sessionID != "" {
		req.Header.Set("Mcp-Session-Id", sessionID)
	}
	resp, err := httpClient.Do(req)
	if err != nil {
		return 0, "", "", fmt.Errorf("executing request: %w", err)
	}
	sessHdr := resp.Header.Get("Mcp-Session-Id")
	status := resp.StatusCode
	var dataLine string
	scanner := bufio.NewScanner(resp.Body)
	for scanner.Scan() {
		line := scanner.Text()
		if strings.HasPrefix(line, "data: ") {
			dataLine = strings.TrimPrefix(line, "data: ")
			break
		}
	}
	_ = resp.Body.Close()
	return status, sessHdr, dataLine, nil
}

func mcpRequestBody(method string, id int, params map[string]any) ([]byte, error) {
	body := map[string]any{"jsonrpc": "2.0", "method": method}
	if id > 0 {
		body["id"] = id
	}
	if params != nil {
		body["params"] = params
	}
	return json.Marshal(body)
}

func tryMCPInitialize(httpClient *http.Client, baseURL string) error {
	body, err := json.Marshal(map[string]any{
		"jsonrpc": "2.0", "id": 1, "method": "initialize",
		"params": map[string]any{
			"protocolVersion": "2025-03-26",
			"capabilities":    map[string]any{},
			"clientInfo":      map[string]any{"name": "warmup-probe", "version": "1.0.0"},
		},
	})
	if err != nil {
		return err
	}
	req, err := http.NewRequest(http.MethodPost, baseURL, bytes.NewReader(body))
	if err != nil {
		return err
	}
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Accept", "application/json, text/event-stream")
	resp, err := httpClient.Do(req)
	if err != nil {
		return fmt.Errorf("POST initialize: %w", err)
	}
	scanner := bufio.NewScanner(resp.Body)
	for scanner.Scan() {
		if strings.HasPrefix(scanner.Text(), "data: ") {
			break
		}
	}
	_ = resp.Body.Close()
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("initialize returned %d, want 200", resp.StatusCode)
	}
	sessionID := resp.Header.Get("Mcp-Session-Id")
	if sessionID == "" {
		return fmt.Errorf("no Mcp-Session-Id in response")
	}
	delReq, err := http.NewRequest(http.MethodDelete, baseURL, nil)
	if err != nil {
		return err
	}
	delReq.Header.Set("Mcp-Session-Id", sessionID)
	delResp, err := httpClient.Do(delReq)
	if err != nil {
		return err
	}
	_ = delResp.Body.Close()
	return nil
}

func doMCPInitialize(httpClient *http.Client, baseURL string, iteration int) (string, error) {
	body, err := mcpRequestBody("initialize", 1, map[string]any{
		"protocolVersion": "2025-03-26",
		"capabilities":    map[string]any{},
		"clientInfo":      map[string]any{"name": fmt.Sprintf("memleak-test-%d", iteration), "version": "1.0.0"},
	})
	if err != nil {
		return "", err
	}
	status, sessionID, _, err := ssePost(httpClient, baseURL, "", body)
	if err != nil {
		return "", err
	}
	if status != http.StatusOK {
		return "", fmt.Errorf("initialize returned %d, want 200", status)
	}
	if sessionID == "" {
		return "", fmt.Errorf("no Mcp-Session-Id in response")
	}
	return sessionID, nil
}

func doMCPNotifyInitialized(httpClient *http.Client, baseURL, sessionID string) error {
	body, err := mcpRequestBody("notifications/initialized", 0, nil)
	if err != nil {
		return err
	}
	status, _, _, err := ssePost(httpClient, baseURL, sessionID, body)
	if err != nil {
		return err
	}
	if status < 200 || status >= 300 {
		return fmt.Errorf("notifications/initialized returned %d", status)
	}
	return nil
}

func doMCPToolsList(httpClient *http.Client, baseURL, sessionID string) error {
	body, err := mcpRequestBody("tools/list", 2, map[string]any{})
	if err != nil {
		return err
	}
	status, _, _, err := ssePost(httpClient, baseURL, sessionID, body)
	if err != nil {
		return err
	}
	if status != http.StatusOK {
		return fmt.Errorf("tools/list returned %d, want 200", status)
	}
	return nil
}

func doMCPToolsCallEcho(httpClient *http.Client, baseURL, sessionID, input string) error {
	body, err := mcpRequestBody("tools/call", 3, map[string]any{
		"name":      "echo",
		"arguments": map[string]any{"input": input},
	})
	if err != nil {
		return err
	}
	status, _, _, err := ssePost(httpClient, baseURL, sessionID, body)
	if err != nil {
		return err
	}
	if status != http.StatusOK {
		return fmt.Errorf("tools/call echo returned %d, want 200", status)
	}
	return nil
}

func doMCPDeleteSession(httpClient *http.Client, baseURL, sessionID string) error {
	req, err := http.NewRequest(http.MethodDelete, baseURL, nil)
	if err != nil {
		return err
	}
	req.Header.Set("Mcp-Session-Id", sessionID)
	resp, err := httpClient.Do(req)
	if err != nil {
		return err
	}
	_ = resp.Body.Close()
	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		return fmt.Errorf("DELETE returned %d", resp.StatusCode)
	}
	return nil
}

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions