Skip to content

Commit 21c5f8d

Browse files
committed
fix(codex): keep run lane timeout progress-aware
1 parent a641a27 commit 21c5f8d

7 files changed

Lines changed: 134 additions & 6 deletions

File tree

extensions/codex/src/app-server/run-attempt.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1604,6 +1604,8 @@ describe("runCodexAppServerAttempt", () => {
16041604
path.join(tempDir, "workspace"),
16051605
);
16061606
params.timeoutMs = 100;
1607+
const onRunProgress = vi.fn();
1608+
params.onRunProgress = onRunProgress;
16071609

16081610
const run = runCodexAppServerAttempt(params, {
16091611
turnCompletionIdleTimeoutMs: 300,
@@ -1649,6 +1651,11 @@ describe("runCodexAppServerAttempt", () => {
16491651
expect(result.timedOut).toBe(false);
16501652
expect(result.promptError).toBeNull();
16511653
expect(harness.request.mock.calls.some(([method]) => method === "turn/interrupt")).toBe(false);
1654+
const progressReasons = onRunProgress.mock.calls.map(([info]) => info.reason);
1655+
expect(progressReasons).toContain("turn:start");
1656+
expect(
1657+
progressReasons.filter((reason) => reason === "notification:rawResponseItem/completed"),
1658+
).toHaveLength(2);
16521659
});
16531660

16541661
it("does not count non-turn app-server requests as turn attempt progress", async () => {
@@ -1659,6 +1666,8 @@ describe("runCodexAppServerAttempt", () => {
16591666
path.join(tempDir, "workspace"),
16601667
);
16611668
params.timeoutMs = 100;
1669+
const onRunProgress = vi.fn();
1670+
params.onRunProgress = onRunProgress;
16621671

16631672
const run = runCodexAppServerAttempt(params, {
16641673
turnCompletionIdleTimeoutMs: 500,
@@ -1689,6 +1698,7 @@ describe("runCodexAppServerAttempt", () => {
16891698
expect(warnData?.timeoutMs).toBe(100);
16901699
expect(warnData?.lastActivityReason).toBe("turn:start");
16911700
expect(harness.request.mock.calls.some(([method]) => method === "turn/interrupt")).toBe(true);
1701+
expect(onRunProgress.mock.calls.map(([info]) => info.reason)).toEqual(["turn:start"]);
16921702
});
16931703

16941704
it("keeps the turn attempt timeout armed while non-turn requests are pending", async () => {

extensions/codex/src/app-server/run-attempt.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1353,6 +1353,12 @@ export async function runCodexAppServerAttempt(
13531353
turnAttemptLastProgressReason = reason;
13541354
turnAttemptLastProgressDetails = options.details;
13551355
renewNativeHookRelayForTurnProgress();
1356+
params.onRunProgress?.({
1357+
reason,
1358+
provider: params.provider,
1359+
model: params.modelId,
1360+
backend: "codex-app-server",
1361+
});
13561362
}
13571363
emitTrustedDiagnosticEvent({
13581364
type: "run.progress",

src/agents/pi-embedded-runner/run.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,8 +379,18 @@ export async function runEmbeddedPiAgent(
379379
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
380380
const globalLane = resolveGlobalLane(params.lane);
381381
const laneTaskTimeoutMs = resolveEmbeddedRunLaneTimeoutMs(params.timeoutMs);
382+
let laneTaskProgressAtMs = Date.now();
383+
const noteLaneTaskProgress = () => {
384+
laneTaskProgressAtMs = Date.now();
385+
};
382386
const withLaneTimeout = (opts?: CommandQueueEnqueueOptions) =>
383-
withEmbeddedRunLaneTimeout(opts, laneTaskTimeoutMs);
387+
withEmbeddedRunLaneTimeout(
388+
{
389+
...opts,
390+
taskTimeoutProgressAtMs: () => laneTaskProgressAtMs,
391+
},
392+
laneTaskTimeoutMs,
393+
);
384394
const enqueueGlobal = <T>(task: () => Promise<T>, opts?: CommandQueueEnqueueOptions) =>
385395
params.enqueue
386396
? params.enqueue(task, withLaneTimeout(opts))
@@ -429,8 +439,15 @@ export async function runEmbeddedPiAgent(
429439
"phase"
430440
>,
431441
) => {
442+
noteLaneTaskProgress();
432443
params.onExecutionPhase?.({ phase, ...extra });
433444
};
445+
const notifyRunProgress = (
446+
info: Parameters<NonNullable<RunEmbeddedPiAgentParams["onRunProgress"]>>[0],
447+
) => {
448+
noteLaneTaskProgress();
449+
params.onRunProgress?.(info);
450+
};
434451
const emitStartupStageSummary = (phase: string) => {
435452
const summary = startupStages.snapshot();
436453
const shouldWarn = shouldWarnEmbeddedRunStageSummary(summary);
@@ -1370,6 +1387,7 @@ export async function runEmbeddedPiAgent(
13701387
legacyBeforeAgentStartResult,
13711388
thinkLevel,
13721389
onToolOutcome: observePostCompactionToolOutcome,
1390+
onRunProgress: notifyRunProgress,
13731391
fastMode: params.fastMode,
13741392
verboseLevel: params.verboseLevel,
13751393
reasoningLevel: params.reasoningLevel,

src/agents/pi-embedded-runner/run/params.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,12 @@ export type RunEmbeddedPiAgentParams = {
177177
itemId?: string;
178178
firstModelCallStarted?: boolean;
179179
}) => void;
180+
onRunProgress?: (info: {
181+
reason: string;
182+
provider?: string;
183+
model?: string;
184+
backend?: string;
185+
}) => void;
180186
replyOperation?: ReplyOperation;
181187
shouldEmitToolResult?: () => boolean;
182188
shouldEmitToolOutput?: () => boolean;

src/process/command-queue.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,72 @@ describe("command queue", () => {
391391
}
392392
});
393393

394+
it("task timeout renews from progress timestamps", async () => {
395+
const lane = `timeout-progress-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
396+
setCommandLaneConcurrency(lane, 1);
397+
398+
vi.useFakeTimers();
399+
try {
400+
let progressAtMs = Date.now();
401+
const blocker = createDeferred();
402+
const first = enqueueCommandInLane(
403+
lane,
404+
async () => {
405+
await blocker.promise;
406+
return "first";
407+
},
408+
{
409+
taskTimeoutMs: 25,
410+
taskTimeoutProgressAtMs: () => progressAtMs,
411+
},
412+
);
413+
let secondRan = false;
414+
const second = enqueueCommandInLane(lane, async () => {
415+
secondRan = true;
416+
return "second";
417+
});
418+
419+
await vi.advanceTimersByTimeAsync(20);
420+
progressAtMs = Date.now();
421+
await vi.advanceTimersByTimeAsync(20);
422+
expect(secondRan).toBe(false);
423+
424+
blocker.resolve();
425+
await expect(first).resolves.toBe("first");
426+
await expect(second).resolves.toBe("second");
427+
expect(secondRan).toBe(true);
428+
} finally {
429+
vi.useRealTimers();
430+
}
431+
});
432+
433+
it("task timeout falls back when progress timestamp callback throws", async () => {
434+
const lane = `timeout-progress-throw-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
435+
setCommandLaneConcurrency(lane, 1);
436+
437+
vi.useFakeTimers();
438+
try {
439+
const first = enqueueCommandInLane(lane, async () => new Promise<never>(() => {}), {
440+
taskTimeoutMs: 25,
441+
taskTimeoutProgressAtMs: () => {
442+
throw new Error("progress failed");
443+
},
444+
});
445+
const firstRejected = expect(first).rejects.toBeInstanceOf(CommandLaneTaskTimeoutError);
446+
447+
await vi.advanceTimersByTimeAsync(25);
448+
await firstRejected;
449+
450+
expect(
451+
diagnosticMocks.diag.warn.mock.calls.some(([message]) =>
452+
String(message).includes("lane task timeout progress callback failed"),
453+
),
454+
).toBe(true);
455+
} finally {
456+
vi.useRealTimers();
457+
}
458+
});
459+
394460
it("keeps work queued while a lane has zero concurrency and drains after resume", async () => {
395461
const lane = `suspended-lane-${Date.now()}-${Math.random().toString(16).slice(2)}`;
396462
setCommandLaneConcurrency(lane, 0);

src/process/command-queue.ts

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type QueueEntry = {
6363
enqueuedAt: number;
6464
warnAfterMs: number;
6565
taskTimeoutMs?: number;
66+
taskTimeoutProgressAtMs?: () => number | undefined;
6667
onWait?: (waitMs: number, queuedAhead: number) => void;
6768
};
6869

@@ -210,14 +211,33 @@ async function runQueueEntryTask(lane: string, entry: QueueEntry): Promise<unkno
210211
return await taskPromise;
211212
}
212213

214+
const startedAtMs = Date.now();
215+
const readLastProgressAtMs = () => {
216+
let value: number | undefined;
217+
try {
218+
value = entry.taskTimeoutProgressAtMs?.();
219+
} catch (err) {
220+
diag.warn(`lane task timeout progress callback failed: lane=${lane} error="${String(err)}"`);
221+
}
222+
return typeof value === "number" && Number.isFinite(value) && value > 0
223+
? Math.max(startedAtMs, Math.floor(value))
224+
: startedAtMs;
225+
};
213226
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
214227
let timedOut = false;
215228
const timeoutPromise = new Promise<never>((_, reject) => {
216-
timeoutHandle = setTimeout(() => {
217-
timedOut = true;
218-
reject(new CommandLaneTaskTimeoutError(lane, taskTimeoutMs));
219-
}, taskTimeoutMs);
220-
timeoutHandle.unref?.();
229+
const armTimeout = () => {
230+
const elapsedMs = Math.max(0, Date.now() - readLastProgressAtMs());
231+
const remainingMs = taskTimeoutMs - elapsedMs;
232+
if (remainingMs <= 0) {
233+
timedOut = true;
234+
reject(new CommandLaneTaskTimeoutError(lane, taskTimeoutMs));
235+
return;
236+
}
237+
timeoutHandle = setTimeout(armTimeout, remainingMs);
238+
timeoutHandle.unref?.();
239+
};
240+
armTimeout();
221241
});
222242

223243
try {
@@ -349,6 +369,7 @@ export function enqueueCommandInLane<T>(
349369
enqueuedAt: Date.now(),
350370
warnAfterMs,
351371
taskTimeoutMs: normalizeTaskTimeoutMs(opts?.taskTimeoutMs),
372+
taskTimeoutProgressAtMs: opts?.taskTimeoutProgressAtMs,
352373
onWait: opts?.onWait,
353374
});
354375
logLaneEnqueue(cleaned, getLaneDepth(state));

src/process/command-queue.types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export type CommandQueueEnqueueOptions = {
22
warnAfterMs?: number;
33
onWait?: (waitMs: number, queuedAhead: number) => void;
44
taskTimeoutMs?: number;
5+
taskTimeoutProgressAtMs?: () => number | undefined;
56
};
67

78
export type CommandQueueEnqueueFn = <T>(

0 commit comments

Comments
 (0)