Skip to content

Commit ab0a633

Browse files
authored
fix: tolerate missing streamed response content type
Fixes the OpenAI-compatible stream transport regression where a valid ChatGPT Codex HTTP 200 stream could arrive without a `content-type` header and be rejected before the OpenAI SDK consumed it. Prepared head SHA: 0d7f8ab Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com>
1 parent d4523cb commit ab0a633

2 files changed

Lines changed: 276 additions & 4 deletions

File tree

src/agents/provider-transport-fetch.test.ts

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,41 @@ function latestTrustedEnvProxyParams(): Record<string, unknown> {
113113
return params;
114114
}
115115

116+
function responseStreamText(text: string): ReadableStream<Uint8Array> {
117+
return responseStreamChunks([text]);
118+
}
119+
120+
function responseStreamChunks(chunks: string[]): ReadableStream<Uint8Array> {
121+
const encoder = new TextEncoder();
122+
return new ReadableStream({
123+
start(controller) {
124+
for (const chunk of chunks) {
125+
controller.enqueue(encoder.encode(chunk));
126+
}
127+
controller.close();
128+
},
129+
});
130+
}
131+
132+
function openResponseStreamText(text: string): {
133+
close: () => void;
134+
stream: ReadableStream<Uint8Array>;
135+
} {
136+
const encoder = new TextEncoder();
137+
let streamController: ReadableStreamDefaultController<Uint8Array> | undefined;
138+
return {
139+
close() {
140+
streamController?.close();
141+
},
142+
stream: new ReadableStream({
143+
start(controller) {
144+
streamController = controller;
145+
controller.enqueue(encoder.encode(text));
146+
},
147+
}),
148+
};
149+
}
150+
116151
describe("buildGuardedModelFetch", () => {
117152
beforeEach(() => {
118153
managedStreamCleanupRegistrations.length = 0;
@@ -203,6 +238,164 @@ describe("buildGuardedModelFetch", () => {
203238
expect(release).toHaveBeenCalled();
204239
});
205240

241+
it("allows missing content-type when streamed OpenAI-compatible responses contain SSE", async () => {
242+
fetchWithSsrFGuardMock.mockResolvedValue({
243+
response: new Response(responseStreamText('data: {"ok": true}\n\ndata: [DONE]\n\n')),
244+
finalUrl: "https://chatgpt.com/backend-api/codex/responses",
245+
release: vi.fn(async () => undefined),
246+
});
247+
const model = {
248+
id: "gpt-5.5",
249+
provider: "openai",
250+
api: "openclaw-openai-responses-transport",
251+
baseUrl: "https://chatgpt.com/backend-api/codex",
252+
} as unknown as Model<"openai-responses">;
253+
254+
const response = await buildGuardedModelFetch(model)(
255+
"https://chatgpt.com/backend-api/codex/responses",
256+
{
257+
method: "POST",
258+
headers: { "content-type": "application/json" },
259+
body: JSON.stringify({ model: "gpt-5.5", stream: true }),
260+
},
261+
);
262+
const items = [];
263+
for await (const item of Stream.fromSSEResponse(response, new AbortController())) {
264+
items.push(item);
265+
}
266+
267+
expect(items).toEqual([{ ok: true }]);
268+
});
269+
270+
it("returns promptly for missing content-type SSE streams that remain open", async () => {
271+
const source = openResponseStreamText('data: {"ok": true}\n\n');
272+
fetchWithSsrFGuardMock.mockResolvedValue({
273+
response: new Response(source.stream),
274+
finalUrl: "https://chatgpt.com/backend-api/codex/responses",
275+
release: vi.fn(async () => undefined),
276+
});
277+
const model = {
278+
id: "gpt-5.5",
279+
provider: "openai",
280+
api: "openclaw-openai-responses-transport",
281+
baseUrl: "https://chatgpt.com/backend-api/codex",
282+
} as unknown as Model<"openai-responses">;
283+
284+
const responsePromise = buildGuardedModelFetch(model)(
285+
"https://chatgpt.com/backend-api/codex/responses",
286+
{
287+
method: "POST",
288+
headers: { "content-type": "application/json" },
289+
body: JSON.stringify({ model: "gpt-5.5", stream: true }),
290+
},
291+
);
292+
const timeout = Symbol("timeout");
293+
const result = await Promise.race<Response | typeof timeout>([
294+
responsePromise,
295+
new Promise<typeof timeout>((resolve) => {
296+
setTimeout(() => resolve(timeout), 100);
297+
}),
298+
]);
299+
source.close();
300+
301+
expect(result).not.toBe(timeout);
302+
const response = result as Response;
303+
const items = [];
304+
for await (const item of Stream.fromSSEResponse(response, new AbortController())) {
305+
items.push(item);
306+
}
307+
308+
expect(items).toEqual([{ ok: true }]);
309+
});
310+
311+
it("allows missing content-type when the SSE prefix is split across chunks", async () => {
312+
fetchWithSsrFGuardMock.mockResolvedValue({
313+
response: new Response(responseStreamChunks(["d", "ata", ': {"ok": true}\n\n'])),
314+
finalUrl: "https://chatgpt.com/backend-api/codex/responses",
315+
release: vi.fn(async () => undefined),
316+
});
317+
const model = {
318+
id: "gpt-5.5",
319+
provider: "openai",
320+
api: "openclaw-openai-responses-transport",
321+
baseUrl: "https://chatgpt.com/backend-api/codex",
322+
} as unknown as Model<"openai-responses">;
323+
324+
const response = await buildGuardedModelFetch(model)(
325+
"https://chatgpt.com/backend-api/codex/responses",
326+
{
327+
method: "POST",
328+
headers: { "content-type": "application/json" },
329+
body: JSON.stringify({ model: "gpt-5.5", stream: true }),
330+
},
331+
);
332+
const items = [];
333+
for await (const item of Stream.fromSSEResponse(response, new AbortController())) {
334+
items.push(item);
335+
}
336+
337+
expect(items).toEqual([{ ok: true }]);
338+
});
339+
340+
it("synthesizes SSE for missing content-type JSON returned to streaming SDK requests", async () => {
341+
fetchWithSsrFGuardMock.mockResolvedValue({
342+
response: new Response(responseStreamText('{"ok": true}')),
343+
finalUrl: "https://chatgpt.com/backend-api/codex/responses",
344+
release: vi.fn(async () => undefined),
345+
});
346+
const model = {
347+
id: "gpt-5.5",
348+
provider: "openai",
349+
api: "openclaw-openai-responses-transport",
350+
baseUrl: "https://chatgpt.com/backend-api/codex",
351+
} as unknown as Model<"openai-responses">;
352+
353+
const response = await buildGuardedModelFetch(model)(
354+
"https://chatgpt.com/backend-api/codex/responses",
355+
{
356+
method: "POST",
357+
headers: { "content-type": "application/json" },
358+
body: JSON.stringify({ model: "gpt-5.5", stream: true }),
359+
},
360+
);
361+
const items = [];
362+
for await (const item of Stream.fromSSEResponse(response, new AbortController())) {
363+
items.push(item);
364+
}
365+
366+
expect(response.headers.get("content-type")).toContain("text/event-stream");
367+
expect(items).toEqual([{ ok: true }]);
368+
});
369+
370+
it("rejects missing content-type streamed OpenAI-compatible responses with HTML bodies", async () => {
371+
const release = vi.fn(async () => undefined);
372+
const model = {
373+
id: "private-model",
374+
provider: "custom-openai",
375+
api: "openai-completions",
376+
baseUrl: "https://proxy.example.com",
377+
} as unknown as Model<"openai-completions">;
378+
fetchWithSsrFGuardMock.mockResolvedValue({
379+
response: new Response(responseStreamText("<html>not the API</html>")),
380+
finalUrl: "https://proxy.example.com/chat/completions",
381+
release,
382+
});
383+
384+
await expect(
385+
buildGuardedModelFetch(model)("https://proxy.example.com/chat/completions", {
386+
method: "POST",
387+
headers: { "content-type": "application/json" },
388+
body: JSON.stringify({ model: "private-model", stream: true }),
389+
}),
390+
).rejects.toMatchObject({
391+
name: "ProviderHttpError",
392+
status: 200,
393+
code: "invalid_provider_content_type",
394+
errorType: "invalid_response",
395+
});
396+
expect(release).toHaveBeenCalled();
397+
});
398+
206399
it("ensures configured local services before the model request", async () => {
207400
const release = vi.fn();
208401
ensureModelProviderLocalServiceMock.mockResolvedValue({ release });

src/agents/provider-transport-fetch.ts

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import {
4343
} from "./provider-request-config.js";
4444

4545
const DEFAULT_MAX_SDK_RETRY_WAIT_SECONDS = 60;
46+
const OPENAI_SDK_STREAM_CONTENT_SNIFF_BYTES = 2 * 1024;
4647
const log = createSubsystemLogger("provider-transport-fetch");
4748
const BLOCKED_EXACT_ORIGIN_TRUST_HOSTNAME_LABELS = new Set(["instance-data"]);
4849
const PLAIN_DECIMAL_NUMBER_RE = /^\d+(?:\.\d+)?$/;
@@ -233,15 +234,93 @@ function isOpenAISdkStreamContentType(contentType: string): boolean {
233234
return /\btext\/event-stream\b/i.test(contentType) || isJsonContentType(contentType);
234235
}
235236

236-
async function assertOpenAISdkStreamContentType(params: {
237+
type OpenAISdkStreamBodyKind = "html" | "json" | "sse" | "unknown";
238+
239+
function classifyOpenAISdkStreamBodyPrefix(text: string): OpenAISdkStreamBodyKind {
240+
const trimmed = text.replace(/^\uFEFF/u, "").trimStart();
241+
if (!trimmed) {
242+
return "unknown";
243+
}
244+
if (trimmed.startsWith("<")) {
245+
return "html";
246+
}
247+
if (trimmed.startsWith("{") || trimmed.startsWith("[")) {
248+
return "json";
249+
}
250+
if (/^(?::|(?:data|event|id|retry)(?::|\r?\n|\r))/u.test(trimmed)) {
251+
return "sse";
252+
}
253+
const boundary = findSseEventBoundary(text);
254+
if (boundary && hasReadableSseData(text.slice(0, boundary.index))) {
255+
return "sse";
256+
}
257+
return "unknown";
258+
}
259+
260+
async function classifyOpenAISdkStreamBody(response: Response): Promise<OpenAISdkStreamBodyKind> {
261+
const reader = response.clone().body?.getReader();
262+
if (!reader) {
263+
return "unknown";
264+
}
265+
266+
const decoder = new TextDecoder();
267+
let total = 0;
268+
let text = "";
269+
try {
270+
while (total < OPENAI_SDK_STREAM_CONTENT_SNIFF_BYTES) {
271+
const { value, done } = await reader.read();
272+
if (done) {
273+
break;
274+
}
275+
if (!value || value.byteLength === 0) {
276+
continue;
277+
}
278+
const remaining = OPENAI_SDK_STREAM_CONTENT_SNIFF_BYTES - total;
279+
const chunk = value.byteLength > remaining ? value.subarray(0, remaining) : value;
280+
total += chunk.byteLength;
281+
text += decoder.decode(chunk, { stream: true });
282+
const kind = classifyOpenAISdkStreamBodyPrefix(text);
283+
if (kind !== "unknown") {
284+
return kind;
285+
}
286+
}
287+
text += decoder.decode();
288+
return classifyOpenAISdkStreamBodyPrefix(text);
289+
} finally {
290+
void reader.cancel().catch(() => undefined);
291+
}
292+
}
293+
294+
function withOpenAISdkStreamContentType(response: Response, contentType: string): Response {
295+
const headers = new Headers(response.headers);
296+
headers.set("content-type", contentType);
297+
return new Response(response.body, {
298+
status: response.status,
299+
statusText: response.statusText,
300+
headers,
301+
});
302+
}
303+
304+
async function normalizeOpenAISdkStreamContentType(params: {
237305
response: Response;
238306
model: Model;
239307
release: () => Promise<void>;
240308
localServiceLease?: ProviderLocalServiceLease;
241-
}): Promise<void> {
309+
}): Promise<Response> {
242310
const contentType = params.response.headers.get("content-type") ?? "";
243311
if (!params.response.ok || !params.response.body || isOpenAISdkStreamContentType(contentType)) {
244-
return;
312+
return params.response;
313+
}
314+
if (!contentType.trim()) {
315+
// ChatGPT Codex can stream valid SSE with no content-type header. Sniff a
316+
// clone so the SDK still receives the original body once we normalize it.
317+
const kind = await classifyOpenAISdkStreamBody(params.response).catch(() => "unknown" as const);
318+
if (kind === "sse") {
319+
return withOpenAISdkStreamContentType(params.response, "text/event-stream; charset=utf-8");
320+
}
321+
if (kind === "json") {
322+
return withOpenAISdkStreamContentType(params.response, "application/json; charset=utf-8");
323+
}
245324
}
246325
const body = await readResponseTextLimited(params.response).catch(() => "");
247326
await params.release().catch(() => undefined);
@@ -760,7 +839,7 @@ export function buildGuardedModelFetch(
760839
});
761840
}
762841
if (synthesizeJsonAsSse && options?.sanitizeSse !== false) {
763-
await assertOpenAISdkStreamContentType({
842+
response = await normalizeOpenAISdkStreamContentType({
764843
response,
765844
model,
766845
release: result.release,

0 commit comments

Comments
 (0)