Skip to content

Commit a7d2bd2

Browse files
committed
fix(codex): wait for active native turns before resume
1 parent ce92536 commit a7d2bd2

3 files changed

Lines changed: 299 additions & 11 deletions

File tree

extensions/codex/src/app-server/run-attempt.test.ts

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3025,6 +3025,152 @@ describe("runCodexAppServerAttempt", () => {
30253025
expect(binding?.threadId).toBe("thread-existing");
30263026
});
30273027

3028+
it("retries turn/start after a native compact turn finishes", async () => {
3029+
const sessionFile = path.join(tempDir, "session.jsonl");
3030+
const workspaceDir = path.join(tempDir, "workspace");
3031+
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
3032+
let turnStartCalls = 0;
3033+
const harnessRef: { current?: ReturnType<typeof createAppServerHarness> } = {};
3034+
const harness = createAppServerHarness(async (method) => {
3035+
if (method === "thread/resume") {
3036+
return threadStartResult("thread-existing");
3037+
}
3038+
if (method === "turn/start") {
3039+
turnStartCalls += 1;
3040+
if (turnStartCalls === 1) {
3041+
queueMicrotask(() => {
3042+
void harnessRef.current?.notify({
3043+
method: "turn/completed",
3044+
params: {
3045+
threadId: "thread-existing",
3046+
turnId: "compact-turn",
3047+
turn: { id: "compact-turn", status: "completed" },
3048+
},
3049+
});
3050+
});
3051+
throw new CodexAppServerRpcError(
3052+
{
3053+
message: "cannot steer a compact turn",
3054+
data: {
3055+
message: "cannot steer a compact turn",
3056+
codexErrorInfo: {
3057+
activeTurnNotSteerable: { turnKind: "compact" },
3058+
},
3059+
additionalDetails: null,
3060+
},
3061+
},
3062+
"turn/start",
3063+
);
3064+
}
3065+
return turnStartResult("turn-1");
3066+
}
3067+
return {};
3068+
});
3069+
harnessRef.current = harness;
3070+
3071+
const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir));
3072+
await vi.waitFor(
3073+
() =>
3074+
expect(harness.requests.filter((request) => request.method === "turn/start")).toHaveLength(
3075+
2,
3076+
),
3077+
fastWait,
3078+
);
3079+
await harness.completeTurn({ threadId: "thread-existing", turnId: "turn-1" });
3080+
await run;
3081+
3082+
expect(harness.requests.map((request) => request.method)).toEqual([
3083+
"thread/resume",
3084+
"turn/start",
3085+
"turn/start",
3086+
"thread/unsubscribe",
3087+
]);
3088+
});
3089+
3090+
it("waits for an already-active native turn before starting a resumed thread turn", async () => {
3091+
const sessionFile = path.join(tempDir, "session.jsonl");
3092+
const workspaceDir = path.join(tempDir, "workspace");
3093+
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
3094+
const harness = createAppServerHarness(async (method) => {
3095+
if (method === "thread/resume") {
3096+
const response = threadStartResult("thread-existing");
3097+
return {
3098+
...response,
3099+
thread: {
3100+
...response.thread,
3101+
status: { type: "active", activeFlags: [] },
3102+
turns: [{ id: "compact-turn", status: "inProgress", items: [] }],
3103+
},
3104+
};
3105+
}
3106+
if (method === "turn/start") {
3107+
return turnStartResult("turn-1");
3108+
}
3109+
return {};
3110+
});
3111+
3112+
const run = runCodexAppServerAttempt(createParams(sessionFile, workspaceDir));
3113+
await harness.waitForMethod("thread/resume");
3114+
await new Promise((resolve) => {
3115+
setTimeout(resolve, 20);
3116+
});
3117+
expect(harness.requests.map((request) => request.method)).not.toContain("turn/start");
3118+
3119+
await harness.notify({
3120+
method: "turn/completed",
3121+
params: {
3122+
threadId: "thread-existing",
3123+
turn: { id: "compact-turn", status: "completed" },
3124+
},
3125+
});
3126+
await harness.waitForMethod("turn/start");
3127+
await harness.completeTurn({ threadId: "thread-existing", turnId: "turn-1" });
3128+
await run;
3129+
3130+
expect(harness.requests.map((request) => request.method)).toEqual([
3131+
"thread/resume",
3132+
"turn/start",
3133+
"thread/unsubscribe",
3134+
]);
3135+
});
3136+
3137+
it("does not retry turn/start for non-compact active turns", async () => {
3138+
const sessionFile = path.join(tempDir, "session.jsonl");
3139+
const workspaceDir = path.join(tempDir, "workspace");
3140+
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
3141+
const harness = createAppServerHarness(async (method) => {
3142+
if (method === "thread/resume") {
3143+
return threadStartResult("thread-existing");
3144+
}
3145+
if (method === "turn/start") {
3146+
throw new CodexAppServerRpcError(
3147+
{
3148+
message: "cannot steer a review turn",
3149+
data: {
3150+
message: "cannot steer a review turn",
3151+
codexErrorInfo: {
3152+
activeTurnNotSteerable: { turnKind: "review" },
3153+
},
3154+
additionalDetails: null,
3155+
},
3156+
},
3157+
"turn/start",
3158+
);
3159+
}
3160+
return {};
3161+
});
3162+
3163+
await expect(runCodexAppServerAttempt(createParams(sessionFile, workspaceDir))).rejects.toThrow(
3164+
"cannot steer a review turn",
3165+
);
3166+
3167+
expect(harness.requests.map((request) => request.method)).toEqual([
3168+
"thread/resume",
3169+
"turn/start",
3170+
"thread/unsubscribe",
3171+
]);
3172+
});
3173+
30283174
it("does not leak unhandled rejections when shutdown closes before interrupt", async () => {
30293175
const unhandledRejections: unknown[] = [];
30303176
const onUnhandledRejection = (reason: unknown) => {

extensions/codex/src/app-server/run-attempt.ts

Lines changed: 139 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,11 @@ import {
116116
defaultLeasedCodexAppServerClientFactory,
117117
type CodexAppServerClientFactory,
118118
} from "./client-factory.js";
119-
import { isCodexAppServerApprovalRequest, type CodexAppServerClient } from "./client.js";
119+
import {
120+
CodexAppServerRpcError,
121+
isCodexAppServerApprovalRequest,
122+
type CodexAppServerClient,
123+
} from "./client.js";
120124
import {
121125
isCodexAppServerApprovalPolicyAllowedByRequirements,
122126
isCodexSandboxExecServerEnabled,
@@ -194,15 +198,16 @@ import {
194198
assertCodexTurnStartResponse,
195199
readCodexDynamicToolCallParams,
196200
} from "./protocol-validators.js";
197-
import type {
198-
CodexSandboxPolicy,
199-
CodexTurnEnvironmentParams,
200-
CodexServerNotification,
201-
CodexDynamicToolCallParams,
202-
CodexDynamicToolCallResponse,
203-
CodexTurnStartResponse,
204-
JsonObject,
205-
JsonValue,
201+
import {
202+
isJsonObject,
203+
type CodexSandboxPolicy,
204+
type CodexTurnEnvironmentParams,
205+
type CodexServerNotification,
206+
type CodexDynamicToolCallParams,
207+
type CodexDynamicToolCallResponse,
208+
type CodexTurnStartResponse,
209+
type JsonObject,
210+
type JsonValue,
206211
} from "./protocol.js";
207212
import { releaseCodexSandboxExecServerEnvironment } from "./sandbox-exec-server.js";
208213
import {
@@ -249,6 +254,7 @@ import { createCodexUserInputBridge } from "./user-input-bridge.js";
249254

250255
const CODEX_NATIVE_HOOK_RELAY_RENEW_INTERVAL_MS = 60_000;
251256
const CODEX_APP_SERVER_PROJECTED_CHARS_PER_TOKEN = 4;
257+
const CODEX_APP_SERVER_ACTIVE_NATIVE_TURN_WAIT_TIMEOUT_MS = 30_000;
252258
const ensuredCodexWorkspaceDirs = new Set<string>();
253259

254260
function estimateCodexAppServerProjectedTurnTokens(params: {
@@ -1490,6 +1496,48 @@ export async function runCodexAppServerAttempt(
14901496
}
14911497
}
14921498
};
1499+
let activeNativeTurnCompletionWaiter:
1500+
| { matches: (notification: CodexServerNotification) => boolean; resolve: () => void }
1501+
| undefined;
1502+
const waitForActiveNativeTurnCompletion = async (
1503+
turnIds?: readonly string[],
1504+
): Promise<boolean> => {
1505+
const turnIdSet = turnIds?.length ? new Set(turnIds) : undefined;
1506+
const matchesCompletion = (notification: CodexServerNotification) =>
1507+
isCodexThreadTurnCompletedNotification(notification, thread.threadId, turnIdSet);
1508+
if (pendingNotifications.some((notification) => matchesCompletion(notification))) {
1509+
return true;
1510+
}
1511+
return await new Promise<boolean>((resolve) => {
1512+
let settled = false;
1513+
const timeoutRef: { current?: ReturnType<typeof setTimeout> } = {};
1514+
const finish = (completedNativeTurn: boolean) => {
1515+
if (settled) {
1516+
return;
1517+
}
1518+
settled = true;
1519+
if (timeoutRef.current) {
1520+
clearTimeout(timeoutRef.current);
1521+
}
1522+
runAbortController.signal.removeEventListener("abort", abortListener);
1523+
if (activeNativeTurnCompletionWaiter?.resolve === finishComplete) {
1524+
activeNativeTurnCompletionWaiter = undefined;
1525+
}
1526+
resolve(completedNativeTurn);
1527+
};
1528+
const finishComplete = () => finish(true);
1529+
const abortListener = () => finish(false);
1530+
timeoutRef.current = setTimeout(
1531+
() => finish(false),
1532+
Math.min(appServer.requestTimeoutMs, CODEX_APP_SERVER_ACTIVE_NATIVE_TURN_WAIT_TIMEOUT_MS),
1533+
);
1534+
activeNativeTurnCompletionWaiter = {
1535+
matches: matchesCompletion,
1536+
resolve: finishComplete,
1537+
};
1538+
runAbortController.signal.addEventListener("abort", abortListener, { once: true });
1539+
});
1540+
};
14931541
const enqueueNotification = (notification: CodexServerNotification): Promise<void> => {
14941542
const projector = projectorRef.current;
14951543
const turnId = turnIdRef.current;
@@ -1512,6 +1560,11 @@ export async function runCodexAppServerAttempt(
15121560
);
15131561
}
15141562
}
1563+
if (notification.method === "turn/completed" && correlation.matchesActiveThread) {
1564+
if (activeNativeTurnCompletionWaiter?.matches(notification)) {
1565+
activeNativeTurnCompletionWaiter.resolve();
1566+
}
1567+
}
15151568
if (isCodexNotificationOutsideActiveRun(correlation)) {
15161569
return Promise.resolve();
15171570
}
@@ -1918,6 +1971,32 @@ export async function runCodexAppServerAttempt(
19181971
throwIfTurnStartAcceptedAfterAbort();
19191972
return startedTurn;
19201973
};
1974+
const activeNativeTurnIds =
1975+
thread.lifecycle.action === "resumed" ? (thread.lifecycle.activeTurnIds ?? []) : [];
1976+
if (activeNativeTurnIds.length > 0) {
1977+
// A resumed Codex thread can already be running a native compact/review turn.
1978+
// Starting an OpenClaw turn before that native turn completes can wedge the
1979+
// accepted turn behind a completion event we intentionally ignore.
1980+
embeddedAgentLog.info(
1981+
"codex app-server resumed thread has active native turn; waiting before turn/start",
1982+
{ threadId: thread.threadId, activeTurnIds: activeNativeTurnIds },
1983+
);
1984+
emitCodexAppServerEvent(params, {
1985+
stream: "codex_app_server.lifecycle",
1986+
data: {
1987+
phase: "turn_start_waiting_for_native_turn",
1988+
threadId: thread.threadId,
1989+
activeTurnIds: activeNativeTurnIds,
1990+
},
1991+
});
1992+
const nativeTurnCompleted = await waitForActiveNativeTurnCompletion(activeNativeTurnIds);
1993+
if (!nativeTurnCompleted && !runAbortController.signal.aborted) {
1994+
embeddedAgentLog.warn(
1995+
"codex app-server active native turn did not complete before turn/start wait timed out",
1996+
{ threadId: thread.threadId, activeTurnIds: activeNativeTurnIds },
1997+
);
1998+
}
1999+
}
19212000
try {
19222001
codexModelCallDiagnostics.emitStarted();
19232002
runAgentHarnessLlmInputHook({
@@ -1932,7 +2011,29 @@ export async function runCodexAppServerAttempt(
19322011
turn = await startCodexTurn();
19332012
} catch (error) {
19342013
let turnStartError = error;
2014+
if (isCodexActiveCompactTurnError(turnStartError)) {
2015+
// Codex native compaction returns before its compact turn finishes. If
2016+
// the next OpenClaw turn collides with that compact turn, wait for the
2017+
// terminal notification and retry once instead of surfacing drift.
2018+
embeddedAgentLog.info(
2019+
"codex app-server turn/start blocked by active compact turn; waiting to retry",
2020+
{ threadId: thread.threadId },
2021+
);
2022+
const compactTurnCompleted = await waitForActiveNativeTurnCompletion();
2023+
if (compactTurnCompleted && !runAbortController.signal.aborted) {
2024+
emitCodexAppServerEvent(params, {
2025+
stream: "codex_app_server.lifecycle",
2026+
data: { phase: "turn_start_retry_after_compact", threadId: thread.threadId },
2027+
});
2028+
try {
2029+
turn = await startCodexTurn();
2030+
} catch (retryError) {
2031+
turnStartError = retryError;
2032+
}
2033+
}
2034+
}
19352035
if (
2036+
turn === undefined &&
19362037
shouldUseFreshCodexThreadAfterContextEngineOverflow({
19372038
error: turnStartError,
19382039
contextEngineActive: Boolean(activeContextEngine),
@@ -2695,6 +2796,34 @@ function isCodexContextWindowError(error: unknown): boolean {
26952796
);
26962797
}
26972798

2799+
function isCodexActiveCompactTurnError(error: unknown): boolean {
2800+
if (!(error instanceof CodexAppServerRpcError)) {
2801+
return false;
2802+
}
2803+
const data = isJsonObject(error.data) ? error.data : undefined;
2804+
const codexErrorInfo = isJsonObject(data?.codexErrorInfo) ? data.codexErrorInfo : undefined;
2805+
const activeTurn = isJsonObject(codexErrorInfo?.activeTurnNotSteerable)
2806+
? codexErrorInfo.activeTurnNotSteerable
2807+
: undefined;
2808+
return activeTurn?.turnKind === "compact";
2809+
}
2810+
2811+
function isCodexThreadTurnCompletedNotification(
2812+
notification: CodexServerNotification,
2813+
threadId: string,
2814+
turnIds?: ReadonlySet<string>,
2815+
): boolean {
2816+
if (notification.method !== "turn/completed") {
2817+
return false;
2818+
}
2819+
const correlation = describeCodexNotificationCorrelation(notification, { threadId });
2820+
if (!correlation.matchesActiveThread) {
2821+
return false;
2822+
}
2823+
const turnId = correlation.turnId ?? correlation.nestedTurnId;
2824+
return !turnIds || (turnId !== undefined && turnIds.has(turnId));
2825+
}
2826+
26982827
function joinPresentSections(...sections: Array<string | undefined>): string {
26992828
return sections.filter((section): section is string => Boolean(section?.trim())).join("\n\n");
27002829
}

0 commit comments

Comments
 (0)