Skip to content

Commit b4f6928

Browse files
authored
fix(gateway): stop chat timeout fallback cascade
Fix gateway/chat timeout abort propagation so timed-out runs do not cascade through fallbacks. Preserve provider timeout errors when the gateway abort signal did not fire, and keep timeout stop reasons in async gateway agent results. Includes regression coverage for chat, follow-up, memory flush, fallback classification, and gateway agent timeout results. Fixes #83962.
1 parent b74cd69 commit b4f6928

14 files changed

Lines changed: 338 additions & 20 deletions

src/agents/agent-command.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1347,6 +1347,7 @@ async function agentCommandInternal(
13471347
model,
13481348
result,
13491349
}),
1350+
abortSignal: opts.abortSignal,
13501351
run: async (providerOverride, modelOverride, runOptions) => {
13511352
const isAutoFallbackPrimaryProbeCandidate =
13521353
autoFallbackPrimaryProbe &&

src/agents/model-fallback.test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2246,6 +2246,63 @@ describe("runWithModelFallback", () => {
22462246
expect(run).toHaveBeenCalledTimes(1);
22472247
});
22482248

2249+
it("does not fall back when the caller abort signal timed out", async () => {
2250+
const cfg = makeCfg();
2251+
const timeoutReason = new Error("chat run timed out");
2252+
timeoutReason.name = "TimeoutError";
2253+
const controller = new AbortController();
2254+
controller.abort(timeoutReason);
2255+
const run = vi
2256+
.fn()
2257+
.mockRejectedValueOnce(
2258+
Object.assign(new Error("This operation was aborted"), { name: "AbortError" }),
2259+
)
2260+
.mockResolvedValueOnce("fallback should not run");
2261+
2262+
await expect(
2263+
runWithModelFallback({
2264+
cfg,
2265+
provider: "openai",
2266+
model: "gpt-4.1-mini",
2267+
abortSignal: controller.signal,
2268+
run,
2269+
}),
2270+
).rejects.toThrow("This operation was aborted");
2271+
2272+
expect(run).toHaveBeenCalledTimes(1);
2273+
});
2274+
2275+
it("does not fall back when a timed-out caller abort is classified from the result", async () => {
2276+
const cfg = makeProviderFallbackCfg("openai-codex");
2277+
const timeoutReason = new Error("chat run timed out");
2278+
timeoutReason.name = "TimeoutError";
2279+
const controller = new AbortController();
2280+
controller.abort(timeoutReason);
2281+
const run = vi
2282+
.fn()
2283+
.mockResolvedValueOnce({ payloads: [] })
2284+
.mockResolvedValueOnce({ payloads: [{ text: "fallback should not run" }] });
2285+
const classifyResult = vi.fn(() => ({
2286+
message: "This operation was aborted",
2287+
reason: "timeout" as const,
2288+
code: "terminal_abort",
2289+
}));
2290+
2291+
await expect(
2292+
runWithModelFallback({
2293+
cfg,
2294+
provider: "openai-codex",
2295+
model: "m1",
2296+
abortSignal: controller.signal,
2297+
run,
2298+
classifyResult,
2299+
}),
2300+
).rejects.toThrow("This operation was aborted");
2301+
2302+
expect(run).toHaveBeenCalledTimes(1);
2303+
expect(classifyResult).toHaveBeenCalledTimes(1);
2304+
});
2305+
22492306
it("appends the configured primary as a last fallback", async () => {
22502307
const cfg = makeCfg({
22512308
agents: {

src/agents/model-fallback.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,20 @@ function shouldRethrowAbort(err: unknown): boolean {
141141
return isFallbackAbortError(err) && !isTimeoutError(err);
142142
}
143143

144+
function isTerminalAbort(signal: AbortSignal | undefined): boolean {
145+
if (!signal?.aborted) {
146+
return false;
147+
}
148+
const reason = signal.reason;
149+
if (!(reason instanceof Error)) {
150+
return false;
151+
}
152+
if (reason.name === "TimeoutError") {
153+
return true;
154+
}
155+
return reason.name === "ClientDisconnectError";
156+
}
157+
144158
function createModelCandidateCollector(allowlist: Set<string> | null | undefined): {
145159
candidates: ModelCandidate[];
146160
addExplicitCandidate: (candidate: ModelCandidate) => void;
@@ -245,6 +259,7 @@ async function runFallbackCandidate<T>(params: {
245259
model: string;
246260
options?: ModelFallbackRunOptions;
247261
attribution?: FailoverAttribution;
262+
abortSignal?: AbortSignal;
248263
}): Promise<{ ok: true; result: T } | { ok: false; error: unknown }> {
249264
try {
250265
const result = params.options
@@ -261,6 +276,9 @@ async function runFallbackCandidate<T>(params: {
261276
if (isNonProviderRuntimeCoordinationError(err)) {
262277
throw err;
263278
}
279+
if (isTerminalAbort(params.abortSignal)) {
280+
throw err;
281+
}
264282
// Normalize abort-wrapped rate-limit errors (e.g. Google Vertex RESOURCE_EXHAUSTED)
265283
// so they become FailoverErrors and continue the fallback loop instead of aborting.
266284
const normalizedFailover = coerceToFailoverError(err, {
@@ -286,13 +304,15 @@ async function runFallbackAttempt<T>(params: {
286304
attempt: number;
287305
total: number;
288306
attribution?: FailoverAttribution;
307+
abortSignal?: AbortSignal;
289308
}): Promise<{ success: ModelFallbackRunResult<T> } | { error: unknown }> {
290309
const runResult = await runFallbackCandidate({
291310
run: params.run,
292311
provider: params.provider,
293312
model: params.model,
294313
options: params.options,
295314
attribution: params.attribution,
315+
abortSignal: params.abortSignal,
296316
});
297317
if (runResult.ok) {
298318
const classification = await params.classifyResult?.({
@@ -308,6 +328,9 @@ async function runFallbackAttempt<T>(params: {
308328
attribution: params.attribution,
309329
});
310330
if (classifiedError) {
331+
if (isTerminalAbort(params.abortSignal)) {
332+
throw classifiedError;
333+
}
311334
return { error: classifiedError };
312335
}
313336
return {
@@ -1059,6 +1082,7 @@ export async function runWithModelFallback<T>(
10591082
onFallbackStep?: ModelFallbackStepHandler;
10601083
classifyResult?: ModelFallbackResultClassifier<T>;
10611084
skipAuthProfileRuntime?: boolean;
1085+
abortSignal?: AbortSignal;
10621086
} & ModelManifestNormalizationContext,
10631087
): Promise<ModelFallbackRunResult<T>> {
10641088
const candidates = resolveFallbackCandidates({
@@ -1301,6 +1325,7 @@ export async function runWithModelFallback<T>(
13011325
attempt: i + 1,
13021326
total: candidates.length,
13031327
attribution: { sessionId: params.sessionId, lane: params.lane },
1328+
abortSignal: params.abortSignal,
13041329
});
13051330
if ("success" in attemptRun) {
13061331
if (i > 0 || attempts.length > 0 || attemptedDuringCooldown) {

src/auto-reply/reply/agent-runner-execution.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ async function getApplyFallbackCandidateSelectionToEntry() {
199199
type FallbackRunnerParams = {
200200
provider: string;
201201
model: string;
202+
abortSignal?: AbortSignal;
202203
run: (provider: string, model: string) => Promise<unknown>;
203204
classifyResult?: (params: {
204205
result: { payloads?: Array<{ text?: string; isError?: boolean; isReasoning?: boolean }> };
@@ -1116,6 +1117,31 @@ describe("runAgentTurnWithFallback", () => {
11161117
vi.clearAllMocks();
11171118
});
11181119

1120+
it("passes the reply abort signal to fallback orchestration and candidates", async () => {
1121+
const { replyOperation } = createMockReplyOperation();
1122+
state.runEmbeddedPiAgentMock.mockResolvedValueOnce({
1123+
payloads: [{ text: "ok" }],
1124+
meta: {},
1125+
});
1126+
1127+
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
1128+
await runAgentTurnWithFallback({
1129+
...createMinimalRunAgentTurnParams(),
1130+
replyOperation,
1131+
});
1132+
1133+
const fallbackCall = requireRecord(
1134+
state.runWithModelFallbackMock.mock.calls[0]?.[0],
1135+
"runWithModelFallback params",
1136+
);
1137+
const embeddedCall = requireRecord(
1138+
state.runEmbeddedPiAgentMock.mock.calls[0]?.[0],
1139+
"runEmbeddedPiAgent params",
1140+
);
1141+
expect(fallbackCall.abortSignal).toBe(replyOperation.abortSignal);
1142+
expect(embeddedCall.abortSignal).toBe(replyOperation.abortSignal);
1143+
});
1144+
11191145
it("rechecks queued auto fallback primary probes before running", async () => {
11201146
const { markAutoFallbackPrimaryProbe } = await import("../../agents/agent-scope.js");
11211147
const probe = {

src/auto-reply/reply/agent-runner-execution.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1870,6 +1870,7 @@ export async function runAgentTurnWithFallback(params: {
18701870
const onToolResult = params.opts?.onToolResult;
18711871
const outcomePlan = buildAgentRuntimeOutcomePlan();
18721872
const runLane = CommandLane.Main;
1873+
const runAbortSignal = params.replyOperation?.abortSignal ?? params.opts?.abortSignal;
18731874
let queuedUserMessagePersistedAcrossFallback = false;
18741875
let assistantErrorPersistedAcrossFallback = false;
18751876
const userTurnTranscriptRecorder =
@@ -1891,6 +1892,7 @@ export async function runAgentTurnWithFallback(params: {
18911892
runId,
18921893
sessionId: params.followupRun.run.sessionId,
18931894
lane: runLane,
1895+
abortSignal: runAbortSignal,
18941896
resolveAgentHarnessRuntimeOverride: (provider) =>
18951897
resolveSessionRuntimeOverrideForProvider({
18961898
provider,
@@ -2089,7 +2091,7 @@ export async function runAgentTurnWithFallback(params: {
20892091
agentAccountId: params.followupRun.run.agentAccountId,
20902092
senderIsOwner: params.followupRun.run.senderIsOwner,
20912093
disableTools: params.opts?.disableTools,
2092-
abortSignal: params.replyOperation?.abortSignal ?? params.opts?.abortSignal,
2094+
abortSignal: runAbortSignal,
20932095
replyOperation: params.replyOperation,
20942096
},
20952097
transformResult: (rawResult) =>
@@ -2219,7 +2221,7 @@ export async function runAgentTurnWithFallback(params: {
22192221
bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default",
22202222
images: currentTurnImages.images,
22212223
imageOrder: currentTurnImages.imageOrder,
2222-
abortSignal: params.replyOperation?.abortSignal ?? params.opts?.abortSignal,
2224+
abortSignal: runAbortSignal,
22232225
replyOperation: params.replyOperation,
22242226
blockReplyBreak: params.resolvedBlockStreamingBreak,
22252227
blockReplyChunking: params.blockReplyChunking,

src/auto-reply/reply/agent-runner-memory.test.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ type RefreshQueuedFollowupSessionParams = {
6666
type ModelFallbackParams = {
6767
provider?: string;
6868
model?: string;
69+
abortSignal?: AbortSignal;
6970
agentId?: string;
7071
sessionKey?: string;
7172
fallbacksOverride?: unknown[];
@@ -89,6 +90,7 @@ type EmbeddedPiAgentParams = {
8990
extraSystemPrompt?: string;
9091
bootstrapPromptWarningSignaturesSeen?: string[];
9192
bootstrapPromptWarningSignature?: string;
93+
abortSignal?: AbortSignal;
9294
};
9395

9496
type CompactEmbeddedPiSessionParams = {
@@ -502,6 +504,7 @@ describe("runMemoryFlushIfNeeded", () => {
502504
compactionCount: 1,
503505
};
504506

507+
const replyOperation = createReplyOperation();
505508
await runMemoryFlushIfNeeded({
506509
cfg: {
507510
agents: {
@@ -527,18 +530,20 @@ describe("runMemoryFlushIfNeeded", () => {
527530
sessionStore: { main: sessionEntry },
528531
sessionKey: "main",
529532
isHeartbeat: false,
530-
replyOperation: createReplyOperation(),
533+
replyOperation,
531534
});
532535

533536
expect(runWithModelFallbackMock).toHaveBeenCalledTimes(1);
534537
const fallbackCall = requireModelFallbackCall();
535538
expect(fallbackCall.provider).toBe("ollama");
536539
expect(fallbackCall.model).toBe("qwen3:8b");
540+
expect(fallbackCall.abortSignal).toBe(replyOperation.abortSignal);
537541
expect(fallbackCall.fallbacksOverride).toEqual([]);
538542
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
539543
const agentCall = requireEmbeddedPiAgentCall();
540544
expect(agentCall.provider).toBe("ollama");
541545
expect(agentCall.model).toBe("qwen3:8b");
546+
expect(agentCall.abortSignal).toBe(replyOperation.abortSignal);
542547
expect(agentCall.authProfileId).toBeUndefined();
543548
expect(agentCall.authProfileIdSource).toBeUndefined();
544549
});

src/auto-reply/reply/agent-runner-memory.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,7 @@ export async function runMemoryFlushIfNeeded(params: {
11651165
runId: flushRunId,
11661166
sessionId: activeSessionEntry?.sessionId ?? params.followupRun.run.sessionId,
11671167
lane: CommandLane.Main,
1168+
abortSignal: params.replyOperation.abortSignal,
11681169
resolveAgentHarnessRuntimeOverride: (provider) =>
11691170
resolveMemoryFlushRuntimeOverrideForProvider({
11701171
provider,

src/auto-reply/reply/followup-runner.test.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,7 +1089,7 @@ describe("createFollowupRunner runtime config", () => {
10891089
expect(typing.markDispatchIdle).toHaveBeenCalledTimes(1);
10901090
});
10911091

1092-
it("passes queued room-event abort signals into followup agent runs", async () => {
1092+
it("passes the admitted reply abort signal into followup fallback and agent runs", async () => {
10931093
const abortController = new AbortController();
10941094
runEmbeddedPiAgentMock.mockResolvedValueOnce({
10951095
payloads: [],
@@ -1113,8 +1113,14 @@ describe("createFollowupRunner runtime config", () => {
11131113
}),
11141114
);
11151115

1116+
const fallbackCall = requireLastMockCallArg(
1117+
runWithModelFallbackMock,
1118+
"run with model fallback",
1119+
);
11161120
const call = requireLastMockCallArg(runEmbeddedPiAgentMock, "run embedded pi agent");
1117-
expect(call.abortSignal).toBe(abortController.signal);
1121+
expect(fallbackCall.abortSignal).toBeInstanceOf(AbortSignal);
1122+
expect(fallbackCall.abortSignal).not.toBe(abortController.signal);
1123+
expect(call.abortSignal).toBe(fallbackCall.abortSignal);
11181124
});
11191125

11201126
it("does not inherit source abort signals for queued user followups", async () => {
@@ -1142,8 +1148,14 @@ describe("createFollowupRunner runtime config", () => {
11421148
}),
11431149
);
11441150

1151+
const fallbackCall = requireLastMockCallArg(
1152+
runWithModelFallbackMock,
1153+
"run with model fallback",
1154+
);
11451155
const call = requireLastMockCallArg(runEmbeddedPiAgentMock, "run embedded pi agent");
1146-
expect(call.abortSignal).toBeUndefined();
1156+
expect(fallbackCall.abortSignal).toBeInstanceOf(AbortSignal);
1157+
expect(fallbackCall.abortSignal).not.toBe(sourceAbortController.signal);
1158+
expect(call.abortSignal).toBe(fallbackCall.abortSignal);
11471159
});
11481160

11491161
it("keeps queued delivery correlations active during followup agent runs", async () => {

src/auto-reply/reply/followup-runner.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ export function createFollowupRunner(params: {
624624
fallbackProvider = run.provider;
625625
fallbackModel = run.model;
626626
replyOperation.setPhase("running");
627+
const runAbortSignal = replyOperation.abortSignal;
627628
let pendingDeferredCliTerminal:
628629
| {
629630
provider: string;
@@ -639,6 +640,7 @@ export function createFollowupRunner(params: {
639640
...resolveModelFallbackOptions(run, runtimeConfig),
640641
cfg: runtimeConfig,
641642
runId,
643+
abortSignal: runAbortSignal,
642644
resolveAgentHarnessRuntimeOverride: (provider) =>
643645
resolveSessionRuntimeOverrideForProvider({
644646
provider,
@@ -764,7 +766,7 @@ export function createFollowupRunner(params: {
764766
}),
765767
agentAccountId: run.agentAccountId,
766768
disableTools: opts?.disableTools,
767-
abortSignal: queued.abortSignal,
769+
abortSignal: runAbortSignal,
768770
},
769771
transformResult: (rawResult) =>
770772
isRoomEventCliRun && rawResult.meta.agentMeta
@@ -850,7 +852,7 @@ export function createFollowupRunner(params: {
850852
bashElevated: run.bashElevated,
851853
timeoutMs: run.timeoutMs,
852854
runId,
853-
abortSignal: queued.abortSignal,
855+
abortSignal: runAbortSignal,
854856
images: queuedImages,
855857
imageOrder: queuedImageOrder,
856858
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,

src/cli/command-secret-gateway.test.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,11 @@ describe("resolveCommandSecretRefsViaGateway", () => {
111111
function setSingleSecretTargetDeps(params: {
112112
path: string;
113113
pathSegments: readonly string[];
114-
resolveManifestContractOwnerPluginId?: (params: {
115-
contract: string;
116-
value: string;
117-
}) => string | undefined;
114+
resolveManifestContractOwnerPluginId?: NonNullable<
115+
Parameters<
116+
typeof commandSecretGatewayTesting.setDepsForTest
117+
>[0]["resolveManifestContractOwnerPluginId"]
118+
>;
118119
}): () => void {
119120
const deps: Parameters<typeof commandSecretGatewayTesting.setDepsForTest>[0] = {
120121
analyzeCommandSecretAssignmentsFromSnapshot: ({ inactiveRefPaths, resolvedConfig }) => {

0 commit comments

Comments
 (0)