Skip to content

Commit 4760ad3

Browse files
committed
fix(cli): restore codex review resume flow
Persist the Codex thread id as a provider resume token so reject/resume reconnects to the same Codex conversation instead of passing the daemon session id to the SDK. Also update the daemon smoke test to wait for async review-session cleanup and guard empty task cleanup arrays under set -u.
1 parent 4738a08 commit 4760ad3

7 files changed

Lines changed: 165 additions & 20 deletions

File tree

packages/cli/src/daemon/resumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
*/
88

99
import { existsSync } from "node:fs";
10-
import type { AgentInfo } from "../agent/systemPrompt.js";
1110
import { AgentClient, type ApiClient } from "../client/index.js";
1211
import { getCredentials } from "../config.js";
1312
import { createLogger } from "../logger.js";
@@ -75,6 +74,7 @@ export async function resumeSession(session: SessionFile, message: string, clien
7574
provider,
7675
taskId,
7776
sessionId: session.sessionId,
77+
resumeToken: session.providerResumeToken,
7878
cwd: workspace.cwd,
7979
taskContext: message,
8080
agentClient,

packages/cli/src/daemon/runtimePool.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export interface SpawnRequest {
4444
provider: AgentProvider;
4545
taskId: string;
4646
sessionId: string;
47+
resumeToken?: string;
4748
cwd: string;
4849
taskContext: string;
4950
agentClient: AgentClient;
@@ -118,6 +119,7 @@ export class RuntimePool {
118119
const handle: AgentHandle = await providerExecute(provider.name, () =>
119120
provider.execute({
120121
sessionId,
122+
resumeToken: req.resumeToken,
121123
cwd: req.cwd,
122124
env: { ...(process.env as Record<string, string>), ...req.agentEnv },
123125
taskContext: req.taskContext,
@@ -431,6 +433,7 @@ async function finalize(agent: AgentProcess, opts: { crashed: boolean; error?: u
431433
// - in_progress + no reject → release (agent forgot to submit review)
432434
// - done/cancelled → cleanup
433435
const taskInReview = agent.resultReceived && !opts.crashed;
436+
const providerResumeToken = agent.handle.getResumeToken?.();
434437

435438
const event: SessionEvent = classifyIteratorEnd({
436439
resultReceived: agent.resultReceived,
@@ -440,7 +443,7 @@ async function finalize(agent: AgentProcess, opts: { crashed: boolean; error?: u
440443
transient,
441444
});
442445

443-
const next = await sessions.applyEvent(sessionId, event).catch((e) => {
446+
const next = await sessions.applyEvent(sessionId, event, providerResumeToken ? { providerResumeToken } : undefined).catch((e) => {
444447
logger.error(`State transition failed for ${sessionId}: ${errMessage(e)}`);
445448
return null;
446449
});

packages/cli/src/providers/codex.ts

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { execSync } from "node:child_process";
12
import { readFileSync } from "node:fs";
23
import { homedir } from "node:os";
34
import { join } from "node:path";
@@ -44,6 +45,27 @@ function calcCost(model: string, inputTokens: number, cachedInputTokens: number,
4445
return (inputTokens * price.input + cachedInputTokens * price.cached_input + outputTokens * price.output) / 1_000_000;
4546
}
4647

48+
function resolveCodexPath(): string | undefined {
49+
try {
50+
const path = execSync("which codex", { encoding: "utf-8", stdio: ["ignore", "pipe", "ignore"] }).trim();
51+
return path || undefined;
52+
} catch {
53+
return undefined;
54+
}
55+
}
56+
57+
function resolveCodexModel(opts: ExecuteOpts): string | undefined {
58+
if (!opts.model) {
59+
return readAccessToken() ? undefined : "o3";
60+
}
61+
if (readAccessToken() && !opts.env.OPENAI_API_KEY) {
62+
// ChatGPT-backed Codex accounts reject explicit model overrides like "o3".
63+
// Let the CLI choose the account-compatible default model.
64+
return undefined;
65+
}
66+
return opts.model;
67+
}
68+
4769
/** Map a single Codex thread event to an AgentEvent (or null to skip). */
4870
export function mapThreadEvent(event: ThreadEvent, model = "o3"): AgentEvent | null {
4971
switch (event.type) {
@@ -174,23 +196,25 @@ export const codexProvider: AgentProvider = {
174196
label: "Codex CLI",
175197

176198
async execute(opts: ExecuteOpts): Promise<AgentHandle> {
177-
const model = opts.model ?? "o3";
199+
const model = resolveCodexModel(opts) ?? "o3";
200+
let resumeToken: string | undefined = opts.resumeToken;
178201

179-
const codex = new Codex({ env: opts.env });
202+
const codex = new Codex({ env: opts.env, codexPathOverride: resolveCodexPath() });
180203
const threadOpts = {
181-
model: opts.model,
204+
model: resolveCodexModel(opts),
182205
workingDirectory: opts.cwd,
183206
sandboxMode: "danger-full-access" as const,
184207
approvalPolicy: "never" as const,
185208
};
186-
const thread = opts.resume ? codex.resumeThread(opts.sessionId, threadOpts) : codex.startThread(threadOpts);
209+
const thread = opts.resume ? codex.resumeThread(opts.resumeToken ?? opts.sessionId, threadOpts) : codex.startThread(threadOpts);
187210

188211
const abortController = new AbortController();
189212
const streamed = await thread.runStreamed(opts.taskContext, { signal: abortController.signal });
190213

191214
const events = (async function* () {
192215
const turnOpen = { value: false };
193216
for await (const event of streamed.events) {
217+
if (event.type === "thread.started") resumeToken = event.thread_id;
194218
yield* mapThreadEventStream(event, model, turnOpen);
195219
}
196220
})();
@@ -206,6 +230,9 @@ export const codexProvider: AgentProvider = {
206230
async send() {
207231
throw new Error("Codex multi-turn send not implemented");
208232
},
233+
getResumeToken() {
234+
return resumeToken;
235+
},
209236
};
210237
},
211238

packages/cli/src/providers/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ export type { AgentEvent, AgentRuntime, ContentBlock, UsageInfo, UsageWindow };
44

55
export interface ExecuteOpts {
66
sessionId: string;
7+
resumeToken?: string;
78
cwd: string;
89
env: Record<string, string>;
910
taskContext: string;
@@ -28,6 +29,7 @@ export interface AgentHandle {
2829
events: AsyncIterable<AgentEvent>;
2930
abort(): Promise<void>;
3031
send(message: string): Promise<void>;
32+
getResumeToken?(): string | undefined;
3133
}
3234

3335
export interface AgentProvider {

packages/cli/src/session/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export interface SessionFile {
3434
workspace?: WorkspaceInfo;
3535
status?: SessionStatus;
3636
model?: string;
37+
providerResumeToken?: string;
3738
gpgSubkeyId?: string | null;
3839
agentUsername?: string;
3940
agentName?: string;

packages/cli/tests/providers.test.ts

Lines changed: 106 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// @vitest-environment node
2-
import { describe, expect, it, vi } from "vitest";
2+
import { beforeEach, describe, expect, it, vi } from "vitest";
33

44
// Mock node:fs so readFileSync never touches disk (gemini reads system prompt file, codex reads auth)
55
vi.mock("node:fs", () => ({
@@ -8,6 +8,12 @@ vi.mock("node:fs", () => ({
88
}),
99
}));
1010

11+
vi.mock("node:child_process", () => ({
12+
execSync: vi.fn().mockImplementation(() => {
13+
throw new Error("missing");
14+
}),
15+
}));
16+
1117
// Mock logger to suppress output
1218
vi.mock("../src/logger.js", () => ({
1319
createLogger: () => ({
@@ -62,6 +68,18 @@ import {
6268
import { getAvailableProviders, getProvider, registerProvider } from "../src/providers/registry.js";
6369
import type { AgentProvider } from "../src/providers/types.js";
6470

71+
beforeEach(async () => {
72+
const fsModule = await import("node:fs");
73+
vi.mocked(fsModule.readFileSync).mockImplementation(() => {
74+
throw new Error("ENOENT");
75+
});
76+
77+
const childProcess = await import("node:child_process");
78+
vi.mocked(childProcess.execSync).mockImplementation(() => {
79+
throw new Error("missing");
80+
});
81+
});
82+
6583
// ---------------------------------------------------------------------------
6684
// mapSDKMessage — rate_limit_event
6785
// ---------------------------------------------------------------------------
@@ -591,6 +609,8 @@ describe("codexProvider.execute — handle shape", () => {
591609
describe("codexProvider.execute — thread selection", () => {
592610
it("calls startThread when resume is false or absent", async () => {
593611
const { Codex } = await import("@openai/codex-sdk");
612+
const childProcess = await import("node:child_process");
613+
vi.mocked(childProcess.execSync).mockReturnValueOnce("/opt/homebrew/bin/codex\n" as any);
594614
const startThreadSpy = vi.fn().mockReturnValue({
595615
runStreamed: vi.fn().mockResolvedValue({ events: (async function* () {})() }),
596616
});
@@ -603,10 +623,15 @@ describe("codexProvider.execute — thread selection", () => {
603623
);
604624
await codexProvider.execute({ sessionId: "s1", cwd: "/tmp", env: {}, taskContext: "ctx" });
605625
expect(startThreadSpy).toHaveBeenCalledOnce();
626+
expect(vi.mocked(Codex)).toHaveBeenCalledWith(expect.objectContaining({ codexPathOverride: "/opt/homebrew/bin/codex" }));
606627
});
607628

608629
it("calls resumeThread when resume is true", async () => {
609630
const { Codex } = await import("@openai/codex-sdk");
631+
const childProcess = await import("node:child_process");
632+
vi.mocked(childProcess.execSync).mockImplementationOnce(() => {
633+
throw new Error("missing");
634+
});
610635
const resumeThreadSpy = vi.fn().mockReturnValue({
611636
runStreamed: vi.fn().mockResolvedValue({ events: (async function* () {})() }),
612637
});
@@ -617,8 +642,63 @@ describe("codexProvider.execute — thread selection", () => {
617642
resumeThread: resumeThreadSpy,
618643
}) as any,
619644
);
620-
await codexProvider.execute({ sessionId: "sess-77", cwd: "/tmp", env: {}, taskContext: "ctx", resume: true });
621-
expect(resumeThreadSpy).toHaveBeenCalledWith("sess-77", expect.any(Object));
645+
await codexProvider.execute({
646+
sessionId: "sess-77",
647+
resumeToken: "codex-thread-1",
648+
cwd: "/tmp",
649+
env: {},
650+
taskContext: "ctx",
651+
resume: true,
652+
});
653+
expect(resumeThreadSpy).toHaveBeenCalledWith("codex-thread-1", expect.any(Object));
654+
});
655+
656+
it("omits explicit model for ChatGPT-backed Codex sessions", async () => {
657+
const { Codex } = await import("@openai/codex-sdk");
658+
const fsModule = await import("node:fs");
659+
const childProcess = await import("node:child_process");
660+
vi.mocked(childProcess.execSync).mockReturnValueOnce("/opt/homebrew/bin/codex\n" as any);
661+
vi.mocked(fsModule.readFileSync).mockReturnValue(JSON.stringify({ tokens: { access_token: "chatgpt-token" } }) as any);
662+
const startThreadSpy = vi.fn().mockReturnValue({
663+
runStreamed: vi.fn().mockResolvedValue({ events: (async function* () {})() }),
664+
});
665+
vi.mocked(Codex).mockImplementationOnce(
666+
() =>
667+
({
668+
startThread: startThreadSpy,
669+
resumeThread: vi.fn(),
670+
}) as any,
671+
);
672+
673+
await codexProvider.execute({ sessionId: "s1", cwd: "/tmp", env: {}, taskContext: "ctx", model: "o3" });
674+
675+
expect(startThreadSpy).toHaveBeenCalledWith(expect.objectContaining({ model: undefined }));
676+
});
677+
678+
it("keeps explicit model when using API-key-backed Codex sessions", async () => {
679+
const { Codex } = await import("@openai/codex-sdk");
680+
const childProcess = await import("node:child_process");
681+
vi.mocked(childProcess.execSync).mockReturnValueOnce("/opt/homebrew/bin/codex\n" as any);
682+
const startThreadSpy = vi.fn().mockReturnValue({
683+
runStreamed: vi.fn().mockResolvedValue({ events: (async function* () {})() }),
684+
});
685+
vi.mocked(Codex).mockImplementationOnce(
686+
() =>
687+
({
688+
startThread: startThreadSpy,
689+
resumeThread: vi.fn(),
690+
}) as any,
691+
);
692+
693+
await codexProvider.execute({
694+
sessionId: "s1",
695+
cwd: "/tmp",
696+
env: { OPENAI_API_KEY: "sk-test" },
697+
taskContext: "ctx",
698+
model: "o3",
699+
});
700+
701+
expect(startThreadSpy).toHaveBeenCalledWith(expect.objectContaining({ model: "o3" }));
622702
});
623703

624704
it("events yields mapped AgentEvents from SDK thread events", async () => {
@@ -646,6 +726,29 @@ describe("codexProvider.execute — thread selection", () => {
646726
expect(events[1]).toEqual({ type: "block.done", block: { type: "text", text: "codex message" } });
647727
});
648728

729+
it("captures the Codex thread id as a resume token from thread.started", async () => {
730+
const { Codex } = await import("@openai/codex-sdk");
731+
vi.mocked(Codex).mockImplementationOnce(
732+
() =>
733+
({
734+
startThread: vi.fn().mockReturnValue({
735+
runStreamed: vi.fn().mockResolvedValue({
736+
events: (async function* () {
737+
yield { type: "thread.started", thread_id: "thread-123" } as any;
738+
yield { type: "turn.completed", usage: {} } as any;
739+
})(),
740+
}),
741+
}),
742+
resumeThread: vi.fn(),
743+
}) as any,
744+
);
745+
const handle = await codexProvider.execute({ sessionId: "s1", cwd: "/tmp", env: {}, taskContext: "ctx" });
746+
for await (const _ev of handle.events) {
747+
// Drain the stream so thread.started is observed.
748+
}
749+
expect(handle.getResumeToken?.()).toBe("thread-123");
750+
});
751+
649752
it("send() throws not-implemented error", async () => {
650753
const { Codex } = await import("@openai/codex-sdk");
651754
const runStreamedSpy = vi.fn().mockResolvedValue({ events: (async function* () {})() });

scripts/daemon-smoke-test.sh

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,19 @@ task_session_exists() {
5757
| xargs grep -l "\"taskId\": *\"$task_id\"" 2>/dev/null | head -1
5858
}
5959

60+
wait_session_cleanup() {
61+
local task_id="$1" timeout_secs="${2:-120}"
62+
local elapsed=0
63+
while [ "$elapsed" -lt "$timeout_secs" ]; do
64+
if [ -z "$(task_session_exists "$task_id")" ]; then
65+
return 0
66+
fi
67+
sleep 2
68+
elapsed=$((elapsed + 2))
69+
done
70+
return 1
71+
}
72+
6073
pass() { echo " PASS: $1"; PASS=$((PASS + 1)); }
6174
fail() { echo " FAIL: $1"; FAIL=$((FAIL + 1)); }
6275

@@ -122,20 +135,17 @@ echo ""
122135
echo "[Test 3/4] Complete — mark task done, verify cleanup"
123136
ak task complete "$T1" >/dev/null 2>&1
124137

125-
# Give daemon a poll cycle to clean up
126-
sleep 15
127-
128138
STATUS_AFTER_COMPLETE=$(task_status "$T1")
129139
if [ "$STATUS_AFTER_COMPLETE" = "done" ]; then
130140
pass "task is done"
131141
else
132142
fail "expected done, got: $STATUS_AFTER_COMPLETE"
133143
fi
134144

135-
if [ -z "$(task_session_exists "$T1")" ]; then
145+
if wait_session_cleanup "$T1" 120; then
136146
pass "session cleaned up after completion"
137147
else
138-
fail "session still exists after completion"
148+
fail "session still exists after completion timeout"
139149
fi
140150
echo ""
141151

@@ -156,9 +166,6 @@ fi
156166
sleep 3
157167
ak task cancel "$T4" >/dev/null 2>&1
158168

159-
# Give daemon a poll cycle to detect and kill
160-
sleep 12
161-
162169
STATUS_AFTER_CANCEL=$(task_status "$T4")
163170
if [ "$STATUS_AFTER_CANCEL" = "cancelled" ]; then
164171
pass "task is cancelled"
@@ -184,9 +191,11 @@ echo " Failed: $FAIL"
184191
echo "==============================="
185192

186193
# Cleanup test tasks
187-
for tid in "${TASKS[@]}"; do
188-
ak task cancel "$tid" >/dev/null 2>&1 || true
189-
done
194+
if [ "${#TASKS[@]}" -gt 0 ]; then
195+
for tid in "${TASKS[@]}"; do
196+
ak task cancel "$tid" >/dev/null 2>&1 || true
197+
done
198+
fi
190199

191200
if [ "$FAIL" -gt 0 ]; then
192201
exit 1

0 commit comments

Comments
 (0)