Skip to content

Commit 25b069a

Browse files
committed
refactor(gateway): split MCP loopback transport helpers
1 parent f856aae commit 25b069a

6 files changed

Lines changed: 423 additions & 307 deletions

File tree

src/gateway/mcp-http.handlers.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import crypto from "node:crypto";
2+
import {
3+
MCP_LOOPBACK_SERVER_NAME,
4+
MCP_LOOPBACK_SERVER_VERSION,
5+
MCP_LOOPBACK_SUPPORTED_PROTOCOL_VERSIONS,
6+
jsonRpcError,
7+
jsonRpcResult,
8+
type JsonRpcRequest,
9+
} from "./mcp-http.protocol.js";
10+
import type { McpLoopbackTool, McpToolSchemaEntry } from "./mcp-http.schema.js";
11+
12+
type McpTextContent = {
13+
type: "text";
14+
text: string;
15+
};
16+
17+
function normalizeToolCallContent(result: unknown): McpTextContent[] {
18+
const content = (result as { content?: unknown })?.content;
19+
if (Array.isArray(content)) {
20+
return content.map((block: { type?: string; text?: string }) => ({
21+
type: (block.type ?? "text") as "text",
22+
text: block.text ?? (typeof block === "string" ? block : JSON.stringify(block)),
23+
}));
24+
}
25+
return [
26+
{
27+
type: "text",
28+
text: typeof result === "string" ? result : JSON.stringify(result),
29+
},
30+
];
31+
}
32+
33+
export async function handleMcpJsonRpc(params: {
34+
message: JsonRpcRequest;
35+
tools: McpLoopbackTool[];
36+
toolSchema: McpToolSchemaEntry[];
37+
}): Promise<object | null> {
38+
const { id, method, params: methodParams } = params.message;
39+
40+
switch (method) {
41+
case "initialize": {
42+
const clientVersion = (methodParams?.protocolVersion as string) ?? "";
43+
const negotiated =
44+
MCP_LOOPBACK_SUPPORTED_PROTOCOL_VERSIONS.find((version) => version === clientVersion) ??
45+
MCP_LOOPBACK_SUPPORTED_PROTOCOL_VERSIONS[0];
46+
return jsonRpcResult(id, {
47+
protocolVersion: negotiated,
48+
capabilities: { tools: {} },
49+
serverInfo: {
50+
name: MCP_LOOPBACK_SERVER_NAME,
51+
version: MCP_LOOPBACK_SERVER_VERSION,
52+
},
53+
});
54+
}
55+
case "notifications/initialized":
56+
case "notifications/cancelled":
57+
return null;
58+
case "tools/list":
59+
return jsonRpcResult(id, { tools: params.toolSchema });
60+
case "tools/call": {
61+
const toolName = methodParams?.name as string;
62+
const toolArgs = (methodParams?.arguments ?? {}) as Record<string, unknown>;
63+
const tool = params.tools.find((candidate) => candidate.name === toolName);
64+
if (!tool) {
65+
return jsonRpcResult(id, {
66+
content: [{ type: "text", text: `Tool not available: ${toolName}` }],
67+
isError: true,
68+
});
69+
}
70+
const toolCallId = `mcp-${crypto.randomUUID()}`;
71+
try {
72+
// oxlint-disable-next-line typescript/no-explicit-any
73+
const result = await (tool as any).execute(toolCallId, toolArgs);
74+
return jsonRpcResult(id, {
75+
content: normalizeToolCallContent(result),
76+
isError: false,
77+
});
78+
} catch (error) {
79+
const message = error instanceof Error ? error.message : String(error);
80+
return jsonRpcResult(id, {
81+
content: [{ type: "text", text: message || "tool execution failed" }],
82+
isError: true,
83+
});
84+
}
85+
}
86+
default:
87+
return jsonRpcError(id, -32601, `Method not found: ${method}`);
88+
}
89+
}

src/gateway/mcp-http.protocol.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
export const MCP_LOOPBACK_SERVER_NAME = "openclaw";
2+
export const MCP_LOOPBACK_SERVER_VERSION = "0.1.0";
3+
export const MCP_LOOPBACK_SUPPORTED_PROTOCOL_VERSIONS = ["2025-03-26", "2024-11-05"] as const;
4+
5+
export type JsonRpcId = string | number | null | undefined;
6+
7+
export type JsonRpcRequest = {
8+
jsonrpc: "2.0";
9+
id?: JsonRpcId;
10+
method: string;
11+
params?: Record<string, unknown>;
12+
};
13+
14+
export function jsonRpcResult(id: JsonRpcId, result: unknown) {
15+
return { jsonrpc: "2.0" as const, id: id ?? null, result };
16+
}
17+
18+
export function jsonRpcError(id: JsonRpcId, code: number, message: string) {
19+
return { jsonrpc: "2.0" as const, id: id ?? null, error: { code, message } };
20+
}

src/gateway/mcp-http.request.ts

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import type { IncomingMessage, ServerResponse } from "node:http";
2+
import { loadConfig } from "../config/config.js";
3+
import { resolveMainSessionKey } from "../config/sessions.js";
4+
import { normalizeMessageChannel } from "../utils/message-channel.js";
5+
import { getHeader } from "./http-utils.js";
6+
7+
const MAX_MCP_BODY_BYTES = 1_048_576;
8+
9+
export type McpRequestContext = {
10+
sessionKey: string;
11+
messageProvider: string | undefined;
12+
accountId: string | undefined;
13+
};
14+
15+
function resolveScopedSessionKey(
16+
cfg: ReturnType<typeof loadConfig>,
17+
rawSessionKey: string | undefined,
18+
): string {
19+
const trimmed = rawSessionKey?.trim();
20+
return !trimmed || trimmed === "main" ? resolveMainSessionKey(cfg) : trimmed;
21+
}
22+
23+
export function validateMcpLoopbackRequest(params: {
24+
req: IncomingMessage;
25+
res: ServerResponse;
26+
token: string;
27+
}): boolean {
28+
let url: URL;
29+
try {
30+
url = new URL(params.req.url ?? "/", `http://${params.req.headers.host ?? "localhost"}`);
31+
} catch {
32+
params.res.writeHead(400, { "Content-Type": "application/json" });
33+
params.res.end(JSON.stringify({ error: "bad_request" }));
34+
return false;
35+
}
36+
37+
if (params.req.method === "GET" && url.pathname.startsWith("/.well-known/")) {
38+
params.res.writeHead(404);
39+
params.res.end();
40+
return false;
41+
}
42+
43+
if (url.pathname !== "/mcp") {
44+
params.res.writeHead(404, { "Content-Type": "application/json" });
45+
params.res.end(JSON.stringify({ error: "not_found" }));
46+
return false;
47+
}
48+
49+
if (params.req.method !== "POST") {
50+
params.res.writeHead(405, { Allow: "POST" });
51+
params.res.end();
52+
return false;
53+
}
54+
55+
const authHeader = getHeader(params.req, "authorization") ?? "";
56+
if (authHeader !== `Bearer ${params.token}`) {
57+
params.res.writeHead(401, { "Content-Type": "application/json" });
58+
params.res.end(JSON.stringify({ error: "unauthorized" }));
59+
return false;
60+
}
61+
62+
const contentType = getHeader(params.req, "content-type") ?? "";
63+
if (!contentType.startsWith("application/json")) {
64+
params.res.writeHead(415, { "Content-Type": "application/json" });
65+
params.res.end(JSON.stringify({ error: "unsupported_media_type" }));
66+
return false;
67+
}
68+
69+
return true;
70+
}
71+
72+
export async function readMcpHttpBody(req: IncomingMessage): Promise<string> {
73+
return await new Promise((resolve, reject) => {
74+
const chunks: Buffer[] = [];
75+
let received = 0;
76+
req.on("data", (chunk: Buffer) => {
77+
received += chunk.length;
78+
if (received > MAX_MCP_BODY_BYTES) {
79+
req.destroy();
80+
reject(new Error(`Request body exceeds ${MAX_MCP_BODY_BYTES} bytes`));
81+
return;
82+
}
83+
chunks.push(chunk);
84+
});
85+
req.on("end", () => resolve(Buffer.concat(chunks).toString("utf-8")));
86+
req.on("error", reject);
87+
});
88+
}
89+
90+
export function resolveMcpRequestContext(
91+
req: IncomingMessage,
92+
cfg: ReturnType<typeof loadConfig>,
93+
): McpRequestContext {
94+
return {
95+
sessionKey: resolveScopedSessionKey(cfg, getHeader(req, "x-session-key")),
96+
messageProvider:
97+
normalizeMessageChannel(getHeader(req, "x-openclaw-message-channel")) ?? undefined,
98+
accountId: getHeader(req, "x-openclaw-account-id")?.trim() || undefined,
99+
};
100+
}

src/gateway/mcp-http.runtime.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import { loadConfig } from "../config/config.js";
2+
import {
3+
buildMcpToolSchema,
4+
type McpLoopbackTool,
5+
type McpToolSchemaEntry,
6+
} from "./mcp-http.schema.js";
7+
import { resolveGatewayScopedTools } from "./tool-resolution.js";
8+
9+
const TOOL_CACHE_TTL_MS = 30_000;
10+
const NATIVE_TOOL_EXCLUDE = new Set(["read", "write", "edit", "apply_patch", "exec", "process"]);
11+
12+
export type McpLoopbackRuntime = {
13+
port: number;
14+
token: string;
15+
};
16+
17+
type CachedScopedTools = {
18+
tools: McpLoopbackTool[];
19+
toolSchema: McpToolSchemaEntry[];
20+
configRef: ReturnType<typeof loadConfig>;
21+
time: number;
22+
};
23+
24+
let activeRuntime: McpLoopbackRuntime | undefined;
25+
26+
export class McpLoopbackToolCache {
27+
#entries = new Map<string, CachedScopedTools>();
28+
29+
resolve(params: {
30+
cfg: ReturnType<typeof loadConfig>;
31+
sessionKey: string;
32+
messageProvider: string | undefined;
33+
accountId: string | undefined;
34+
}): CachedScopedTools {
35+
const cacheKey = [params.sessionKey, params.messageProvider ?? "", params.accountId ?? ""].join(
36+
"\u0000",
37+
);
38+
const now = Date.now();
39+
const cached = this.#entries.get(cacheKey);
40+
if (cached && cached.configRef === params.cfg && now - cached.time < TOOL_CACHE_TTL_MS) {
41+
return cached;
42+
}
43+
44+
const next = resolveGatewayScopedTools({
45+
cfg: params.cfg,
46+
sessionKey: params.sessionKey,
47+
messageProvider: params.messageProvider,
48+
accountId: params.accountId,
49+
excludeToolNames: NATIVE_TOOL_EXCLUDE,
50+
});
51+
const nextEntry: CachedScopedTools = {
52+
tools: next.tools,
53+
toolSchema: buildMcpToolSchema(next.tools),
54+
configRef: params.cfg,
55+
time: now,
56+
};
57+
this.#entries.set(cacheKey, nextEntry);
58+
for (const [key, entry] of this.#entries) {
59+
if (now - entry.time >= TOOL_CACHE_TTL_MS) {
60+
this.#entries.delete(key);
61+
}
62+
}
63+
return nextEntry;
64+
}
65+
}
66+
67+
export function getActiveMcpLoopbackRuntime(): McpLoopbackRuntime | undefined {
68+
return activeRuntime ? { ...activeRuntime } : undefined;
69+
}
70+
71+
export function setActiveMcpLoopbackRuntime(runtime: McpLoopbackRuntime): void {
72+
activeRuntime = { ...runtime };
73+
}
74+
75+
export function clearActiveMcpLoopbackRuntime(token: string): void {
76+
if (activeRuntime?.token === token) {
77+
activeRuntime = undefined;
78+
}
79+
}
80+
81+
export function createMcpLoopbackServerConfig(port: number) {
82+
return {
83+
mcpServers: {
84+
openclaw: {
85+
type: "http",
86+
url: `http://127.0.0.1:${port}/mcp`,
87+
headers: {
88+
Authorization: "Bearer ${OPENCLAW_MCP_TOKEN}",
89+
"x-session-key": "${OPENCLAW_MCP_SESSION_KEY}",
90+
"x-openclaw-agent-id": "${OPENCLAW_MCP_AGENT_ID}",
91+
"x-openclaw-account-id": "${OPENCLAW_MCP_ACCOUNT_ID}",
92+
"x-openclaw-message-channel": "${OPENCLAW_MCP_MESSAGE_CHANNEL}",
93+
},
94+
},
95+
},
96+
};
97+
}

src/gateway/mcp-http.schema.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { logWarn } from "../logger.js";
2+
import { resolveGatewayScopedTools } from "./tool-resolution.js";
3+
4+
export type McpLoopbackTool = ReturnType<typeof resolveGatewayScopedTools>["tools"][number];
5+
6+
export type McpToolSchemaEntry = {
7+
name: string;
8+
description: string | undefined;
9+
inputSchema: Record<string, unknown>;
10+
};
11+
12+
function flattenUnionSchema(raw: Record<string, unknown>): Record<string, unknown> {
13+
const variants = (raw.anyOf ?? raw.oneOf) as Record<string, unknown>[] | undefined;
14+
if (!Array.isArray(variants) || variants.length === 0) {
15+
return raw;
16+
}
17+
const mergedProps: Record<string, unknown> = {};
18+
const requiredSets: Set<string>[] = [];
19+
for (const variant of variants) {
20+
const props = variant.properties as Record<string, unknown> | undefined;
21+
if (props) {
22+
for (const [key, schema] of Object.entries(props)) {
23+
if (!(key in mergedProps)) {
24+
mergedProps[key] = schema;
25+
continue;
26+
}
27+
const existing = mergedProps[key] as Record<string, unknown>;
28+
const incoming = schema as Record<string, unknown>;
29+
if (Array.isArray(existing.enum) && Array.isArray(incoming.enum)) {
30+
mergedProps[key] = {
31+
...existing,
32+
enum: [...new Set([...(existing.enum as unknown[]), ...(incoming.enum as unknown[])])],
33+
};
34+
continue;
35+
}
36+
if ("const" in existing && "const" in incoming && existing.const !== incoming.const) {
37+
const merged: Record<string, unknown> = {
38+
...existing,
39+
enum: [existing.const, incoming.const],
40+
};
41+
delete merged.const;
42+
mergedProps[key] = merged;
43+
continue;
44+
}
45+
logWarn(
46+
`mcp loopback: conflicting schema definitions for "${key}", keeping the first variant`,
47+
);
48+
}
49+
}
50+
requiredSets.push(
51+
new Set(Array.isArray(variant.required) ? (variant.required as string[]) : []),
52+
);
53+
}
54+
const required =
55+
requiredSets.length > 0
56+
? [
57+
...requiredSets.reduce(
58+
(left, right) => new Set([...left].filter((key) => right.has(key))),
59+
),
60+
]
61+
: [];
62+
const { anyOf: _anyOf, oneOf: _oneOf, ...rest } = raw;
63+
return { ...rest, type: "object", properties: mergedProps, required };
64+
}
65+
66+
export function buildMcpToolSchema(tools: McpLoopbackTool[]): McpToolSchemaEntry[] {
67+
return tools.map((tool) => {
68+
let raw =
69+
tool.parameters && typeof tool.parameters === "object"
70+
? { ...(tool.parameters as Record<string, unknown>) }
71+
: {};
72+
if (raw.anyOf || raw.oneOf) {
73+
raw = flattenUnionSchema(raw);
74+
}
75+
if (raw.type !== "object") {
76+
raw.type = "object";
77+
if (!raw.properties) {
78+
raw.properties = {};
79+
}
80+
}
81+
return {
82+
name: tool.name,
83+
description: tool.description,
84+
inputSchema: raw,
85+
};
86+
});
87+
}

0 commit comments

Comments
 (0)