Skip to content

Commit 4407c31

Browse files
committed
fix: honor openai sse transport for agent turns
1 parent d419bcf commit 4407c31

3 files changed

Lines changed: 97 additions & 35 deletions

File tree

src/agents/pi-embedded-runner/run/attempt.spawn-workspace.websocket.test.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import { describe, expect, it } from "vitest";
2-
import { shouldUseOpenAIWebSocketTransport } from "./attempt.thread-helpers.js";
2+
import {
3+
shouldUseOpenAIWebSocketTransport,
4+
shouldUseOpenAIWebSocketTransportForAttempt,
5+
} from "./attempt.thread-helpers.js";
36

47
describe("openai websocket transport selection", () => {
58
it("accepts direct OpenAI Responses endpoints", () => {
@@ -76,4 +79,24 @@ describe("openai websocket transport selection", () => {
7679
}),
7780
).toBe(false);
7881
});
82+
83+
it("honors prepared SSE transport params before selecting websocket", () => {
84+
expect(
85+
shouldUseOpenAIWebSocketTransportForAttempt({
86+
provider: "openai",
87+
modelApi: "openai-responses",
88+
modelBaseUrl: "https://api.openai.com/v1",
89+
effectiveExtraParams: { transport: "sse" },
90+
}),
91+
).toBe(false);
92+
93+
expect(
94+
shouldUseOpenAIWebSocketTransportForAttempt({
95+
provider: "openai",
96+
modelApi: "openai-responses",
97+
modelBaseUrl: "https://api.openai.com/v1",
98+
effectiveExtraParams: { transport: "auto" },
99+
}),
100+
).toBe(true);
101+
});
79102
});

src/agents/pi-embedded-runner/run/attempt.thread-helpers.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,29 @@ export function shouldUseOpenAIWebSocketTransport(params: {
5555
return endpointClass === "default" || endpointClass === "openai-public";
5656
}
5757

58+
function hasExplicitSseTransport(sources: Array<Record<string, unknown> | undefined>): boolean {
59+
return sources.some((source) => {
60+
const transport = typeof source?.transport === "string" ? source.transport : "";
61+
return transport.trim().toLowerCase() === "sse";
62+
});
63+
}
64+
65+
export function shouldUseOpenAIWebSocketTransportForAttempt(params: {
66+
provider: string;
67+
modelApi?: string | null;
68+
modelBaseUrl?: string | null;
69+
streamParams?: Record<string, unknown>;
70+
effectiveExtraParams?: Record<string, unknown>;
71+
modelParams?: Record<string, unknown>;
72+
}): boolean {
73+
if (
74+
hasExplicitSseTransport([params.streamParams, params.effectiveExtraParams, params.modelParams])
75+
) {
76+
return false;
77+
}
78+
return shouldUseOpenAIWebSocketTransport(params);
79+
}
80+
5881
function shouldAppendAttemptCacheTtl(params: {
5982
timedOutDuringCompaction: boolean;
6083
compactionOccurredThisAttempt: boolean;

src/agents/pi-embedded-runner/run/attempt.ts

Lines changed: 50 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ import {
172172
applyExtraParamsToAgent,
173173
resolveAgentTransportOverride,
174174
resolveExplicitSettingsTransport,
175+
resolveExtraParams,
176+
resolvePreparedExtraParams,
175177
} from "../extra-params.js";
176178
import { prepareGooglePromptCacheStreamFn } from "../google-prompt-cache.js";
177179
import { getDmHistoryLimitFromSessionKey, limitHistoryTurns } from "../history.js";
@@ -292,7 +294,7 @@ import {
292294
composeSystemPromptWithHookContext,
293295
resolveAttemptSpawnWorkspaceDir,
294296
shouldPersistCompletedBootstrapTurn,
295-
shouldUseOpenAIWebSocketTransport,
297+
shouldUseOpenAIWebSocketTransportForAttempt,
296298
} from "./attempt.thread-helpers.js";
297299
import {
298300
shouldRepairMalformedToolCallArguments,
@@ -1769,25 +1771,57 @@ export async function runEmbeddedAttempt(
17691771
const defaultSessionStreamFn = resolveEmbeddedAgentBaseStreamFn({
17701772
session: activeSession,
17711773
});
1774+
const resolvedTransport = resolveExplicitSettingsTransport({
1775+
settingsManager,
1776+
sessionTransport: activeSession.agent.transport,
1777+
});
1778+
const streamExtraParamsOverride = {
1779+
...params.streamParams,
1780+
fastMode: params.fastMode,
1781+
};
1782+
const preparedRuntimeExtraParams = params.runtimePlan?.transport.resolveExtraParams({
1783+
extraParamsOverride: streamExtraParamsOverride,
1784+
thinkingLevel: params.thinkLevel,
1785+
agentId: sessionAgentId,
1786+
workspaceDir: effectiveWorkspace,
1787+
model: params.model,
1788+
resolvedTransport,
1789+
});
1790+
const resolvedExtraParams = resolveExtraParams({
1791+
cfg: params.config,
1792+
provider: params.provider,
1793+
modelId: params.modelId,
1794+
agentId: sessionAgentId,
1795+
});
1796+
const effectiveExtraParams =
1797+
preparedRuntimeExtraParams ??
1798+
resolvePreparedExtraParams({
1799+
cfg: params.config,
1800+
provider: params.provider,
1801+
modelId: params.modelId,
1802+
extraParamsOverride: streamExtraParamsOverride,
1803+
thinkingLevel: params.thinkLevel,
1804+
agentId: sessionAgentId,
1805+
agentDir,
1806+
workspaceDir: effectiveWorkspace,
1807+
resolvedExtraParams,
1808+
model: params.model,
1809+
resolvedTransport,
1810+
});
17721811
const providerStreamFn = registerProviderStreamForModel({
17731812
model: params.model,
17741813
cfg: params.config,
17751814
agentDir,
17761815
workspaceDir: effectiveWorkspace,
17771816
});
1778-
const hasExplicitSseTransport = [
1779-
(params.streamParams as { transport?: unknown } | undefined)?.transport,
1780-
(params.model as { params?: { transport?: unknown } }).params?.transport,
1781-
]
1782-
.map((value) => (typeof value === "string" ? value.trim().toLowerCase() : ""))
1783-
.includes("sse");
1784-
const shouldUseWebSocketTransport =
1785-
!hasExplicitSseTransport &&
1786-
shouldUseOpenAIWebSocketTransport({
1787-
provider: params.provider,
1788-
modelApi: params.model.api,
1789-
modelBaseUrl: params.model.baseUrl,
1790-
});
1817+
const shouldUseWebSocketTransport = shouldUseOpenAIWebSocketTransportForAttempt({
1818+
provider: params.provider,
1819+
modelApi: params.model.api,
1820+
modelBaseUrl: params.model.baseUrl,
1821+
streamParams: params.streamParams,
1822+
effectiveExtraParams,
1823+
modelParams: (params.model as { params?: Record<string, unknown> }).params,
1824+
});
17911825
const wsApiKey = shouldUseWebSocketTransport
17921826
? await resolveEmbeddedAgentApiKey({
17931827
provider: params.provider,
@@ -1832,23 +1866,7 @@ export async function runEmbeddedAttempt(
18321866
});
18331867
}
18341868

1835-
const resolvedTransport = resolveExplicitSettingsTransport({
1836-
settingsManager,
1837-
sessionTransport: activeSession.agent.transport,
1838-
});
1839-
const streamExtraParamsOverride = {
1840-
...params.streamParams,
1841-
fastMode: params.fastMode,
1842-
};
1843-
const preparedRuntimeExtraParams = params.runtimePlan?.transport.resolveExtraParams({
1844-
extraParamsOverride: streamExtraParamsOverride,
1845-
thinkingLevel: params.thinkLevel,
1846-
agentId: sessionAgentId,
1847-
workspaceDir: effectiveWorkspace,
1848-
model: params.model,
1849-
resolvedTransport,
1850-
});
1851-
const { effectiveExtraParams } = applyExtraParamsToAgent(
1869+
applyExtraParamsToAgent(
18521870
activeSession.agent,
18531871
params.config,
18541872
params.provider,
@@ -1860,9 +1878,7 @@ export async function runEmbeddedAttempt(
18601878
params.model,
18611879
agentDir,
18621880
resolvedTransport,
1863-
preparedRuntimeExtraParams
1864-
? { preparedExtraParams: preparedRuntimeExtraParams }
1865-
: undefined,
1881+
{ preparedExtraParams: effectiveExtraParams },
18661882
);
18671883
const effectivePromptCacheRetention = resolveCacheRetention(
18681884
effectiveExtraParams,

0 commit comments

Comments
 (0)