Skip to content

Commit 689dda5

Browse files
juliusmarmingecodex
andcommitted
Persist thread runtime mode and handle mode-switch session restarts
- Add `thread.runtime-mode.set` -> `thread.runtime-mode-set` flow in decider/reactors/projectors - Persist runtime mode on thread/session projections with new DB migrations - Update provider and Codex integration tests to verify thread continuity across mode changes Co-authored-by: codex <codex@users.noreply.github.com>
1 parent 0a67f9d commit 689dda5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+743
-173
lines changed

apps/server/integration/OrchestrationEngineHarness.integration.ts

Lines changed: 46 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import { ProviderUnsupportedError } from "../src/provider/Errors.ts";
3737
import { ProviderAdapterRegistry } from "../src/provider/Services/ProviderAdapterRegistry.ts";
3838
import { ProviderSessionDirectoryLive } from "../src/provider/Layers/ProviderSessionDirectory.ts";
3939
import { makeProviderServiceLive } from "../src/provider/Layers/ProviderService.ts";
40+
import { makeCodexAdapterLive } from "../src/provider/Layers/CodexAdapter.ts";
41+
import { CodexAdapter } from "../src/provider/Services/CodexAdapter.ts";
4042
import { ProviderService } from "../src/provider/Services/ProviderService.ts";
4143
import { CheckpointReactorLive } from "../src/orchestration/Layers/CheckpointReactor.ts";
4244
import { OrchestrationEngineLive } from "../src/orchestration/Layers/OrchestrationEngine.ts";
@@ -187,6 +189,7 @@ export interface OrchestrationIntegrationHarness {
187189

188190
interface MakeOrchestrationIntegrationHarnessOptions {
189191
readonly provider?: "codex" | "claudeCode";
192+
readonly realCodex?: boolean;
190193
}
191194

192195
export const makeOrchestrationIntegrationHarness = (
@@ -195,10 +198,21 @@ export const makeOrchestrationIntegrationHarness = (
195198
Effect.gen(function* () {
196199
const sleep = (ms: number) => Effect.sleep(ms);
197200
const provider = options?.provider ?? "codex";
198-
const adapterHarness = yield* makeTestProviderAdapterHarness({
199-
provider,
200-
});
201-
201+
const useRealCodex = options?.realCodex === true;
202+
const adapterHarness = useRealCodex
203+
? null
204+
: yield* makeTestProviderAdapterHarness({
205+
provider,
206+
});
207+
const fakeRegistry = adapterHarness
208+
? Layer.succeed(ProviderAdapterRegistry, {
209+
getByProvider: (resolvedProvider) =>
210+
resolvedProvider === adapterHarness.provider
211+
? Effect.succeed(adapterHarness.adapter)
212+
: Effect.fail(new ProviderUnsupportedError({ provider: resolvedProvider })),
213+
listProviders: () => Effect.succeed([adapterHarness.provider]),
214+
} as typeof ProviderAdapterRegistry.Service)
215+
: null;
202216
const rootDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-orchestration-integration-"));
203217
const workspaceDir = path.join(rootDir, "workspace");
204218
const stateDir = path.join(rootDir, "state");
@@ -207,14 +221,6 @@ export const makeOrchestrationIntegrationHarness = (
207221
fs.mkdirSync(stateDir, { recursive: true });
208222
initializeGitWorkspace(workspaceDir);
209223

210-
const registry: typeof ProviderAdapterRegistry.Service = {
211-
getByProvider: (provider) =>
212-
provider === adapterHarness.provider
213-
? Effect.succeed(adapterHarness.adapter)
214-
: Effect.fail(new ProviderUnsupportedError({ provider })),
215-
listProviders: () => Effect.succeed([adapterHarness.provider]),
216-
};
217-
218224
const persistenceLayer = makeSqlitePersistenceLive(dbPath);
219225
const orchestrationLayer = OrchestrationEngineLive.pipe(
220226
Layer.provide(OrchestrationProjectionPipelineLive),
@@ -224,10 +230,33 @@ export const makeOrchestrationIntegrationHarness = (
224230
const providerSessionDirectoryLayer = ProviderSessionDirectoryLive.pipe(
225231
Layer.provide(ProviderSessionRuntimeRepositoryLive),
226232
);
227-
const providerLayer = makeProviderServiceLive().pipe(
228-
Layer.provide(providerSessionDirectoryLayer),
229-
Layer.provide(Layer.succeed(ProviderAdapterRegistry, registry)),
233+
const realCodexRegistry = Layer.effect(
234+
ProviderAdapterRegistry,
235+
Effect.gen(function* () {
236+
const codexAdapter = yield* CodexAdapter;
237+
return {
238+
getByProvider: (resolvedProvider) =>
239+
resolvedProvider === "codex"
240+
? Effect.succeed(codexAdapter)
241+
: Effect.fail(new ProviderUnsupportedError({ provider: resolvedProvider })),
242+
listProviders: () => Effect.succeed(["codex"] as const),
243+
} as typeof ProviderAdapterRegistry.Service;
244+
}),
245+
).pipe(
246+
Layer.provide(makeCodexAdapterLive()),
247+
Layer.provideMerge(ServerConfig.layerTest(workspaceDir, stateDir)),
248+
Layer.provideMerge(NodeServices.layer),
249+
Layer.provideMerge(providerSessionDirectoryLayer),
230250
);
251+
const providerLayer = useRealCodex
252+
? makeProviderServiceLive().pipe(
253+
Layer.provide(providerSessionDirectoryLayer),
254+
Layer.provide(realCodexRegistry),
255+
)
256+
: makeProviderServiceLive().pipe(
257+
Layer.provide(providerSessionDirectoryLayer),
258+
Layer.provide(fakeRegistry!),
259+
);
231260

232261
const runtimeServicesLayer = Layer.mergeAll(
233262
orchestrationLayer,
@@ -407,8 +436,8 @@ export const makeOrchestrationIntegrationHarness = (
407436
return {
408437
rootDir,
409438
workspaceDir,
410-
dbPath,
411-
adapterHarness,
439+
dbPath,
440+
adapterHarness: adapterHarness as TestProviderAdapterHarness,
412441
engine,
413442
snapshotQuery,
414443
providerService,

apps/server/integration/TestProviderAdapter.integration.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ export const makeTestProviderAdapterHarness = (options?: MakeTestProviderAdapter
264264
sessionId,
265265
provider,
266266
status: "ready",
267+
runtimeMode: input.runtimeMode,
267268
threadId,
268269
cwd: input.cwd,
269270
resumeCursor: input.resumeCursor ?? { sessionId },

apps/server/integration/orchestrationEngine.integration.test.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,16 @@ function withHarness<A, E>(
9797
);
9898
}
9999

100+
function withRealCodexHarness<A, E>(
101+
use: (harness: OrchestrationIntegrationHarness) => Effect.Effect<A, E>,
102+
) {
103+
return Effect.acquireUseRelease(
104+
makeOrchestrationIntegrationHarness({ provider: "codex", realCodex: true }),
105+
use,
106+
(harness) => harness.dispose,
107+
);
108+
}
109+
100110
const seedProjectAndThread = (harness: OrchestrationIntegrationHarness) =>
101111
Effect.gen(function* () {
102112
const createdAt = nowIso();
@@ -118,6 +128,7 @@ const seedProjectAndThread = (harness: OrchestrationIntegrationHarness) =>
118128
projectId: PROJECT_ID,
119129
title: "Integration Thread",
120130
model: "gpt-5-codex",
131+
runtimeMode: "approval-required",
121132
branch: null,
122133
worktreePath: harness.workspaceDir,
123134
createdAt,
@@ -216,6 +227,97 @@ it.live("runs a single turn end-to-end and persists checkpoint state in sqlite +
216227
),
217228
);
218229

230+
it.live.skipIf(!process.env.CODEX_BINARY_PATH)(
231+
"keeps the same Codex provider thread across runtime mode switches",
232+
() =>
233+
withRealCodexHarness((harness) =>
234+
Effect.gen(function* () {
235+
const createdAt = nowIso();
236+
237+
yield* harness.engine.dispatch({
238+
type: "project.create",
239+
commandId: CommandId.makeUnsafe("cmd-project-create-real-codex"),
240+
projectId: PROJECT_ID,
241+
title: "Integration Project",
242+
workspaceRoot: harness.workspaceDir,
243+
defaultModel: "gpt-5.3-codex",
244+
createdAt,
245+
});
246+
247+
yield* harness.engine.dispatch({
248+
type: "thread.create",
249+
commandId: CommandId.makeUnsafe("cmd-thread-create-real-codex"),
250+
threadId: THREAD_ID,
251+
projectId: PROJECT_ID,
252+
title: "Integration Thread",
253+
model: "gpt-5.3-codex",
254+
runtimeMode: "full-access",
255+
branch: null,
256+
worktreePath: harness.workspaceDir,
257+
createdAt,
258+
});
259+
260+
yield* harness.engine.dispatch({
261+
type: "thread.turn.start",
262+
commandId: CommandId.makeUnsafe("cmd-turn-start-real-codex-1"),
263+
threadId: THREAD_ID,
264+
message: {
265+
messageId: asMessageId("msg-real-codex-1"),
266+
role: "user",
267+
text: "Reply with exactly ALPHA.",
268+
attachments: [],
269+
},
270+
runtimeMode: "full-access",
271+
createdAt: nowIso(),
272+
});
273+
274+
const firstThread = yield* harness.waitForThread(
275+
THREAD_ID,
276+
(entry) =>
277+
entry.session?.status === "ready" &&
278+
entry.session.providerName === "codex" &&
279+
entry.session.providerThreadId !== null &&
280+
entry.messages.some(
281+
(message) => message.role === "assistant" && message.streaming === false,
282+
),
283+
180_000,
284+
);
285+
286+
const originalProviderThreadId = firstThread.session?.providerThreadId;
287+
assert.isNotNull(originalProviderThreadId);
288+
289+
yield* harness.engine.dispatch({
290+
type: "thread.turn.start",
291+
commandId: CommandId.makeUnsafe("cmd-turn-start-real-codex-2"),
292+
threadId: THREAD_ID,
293+
message: {
294+
messageId: asMessageId("msg-real-codex-2"),
295+
role: "user",
296+
text: "Reply with exactly BETA.",
297+
attachments: [],
298+
},
299+
runtimeMode: "approval-required",
300+
createdAt: nowIso(),
301+
});
302+
303+
const secondThread = yield* harness.waitForThread(
304+
THREAD_ID,
305+
(entry) =>
306+
entry.session?.status === "ready" &&
307+
entry.session.providerName === "codex" &&
308+
entry.session.providerThreadId !== null &&
309+
entry.session.runtimeMode === "approval-required" &&
310+
entry.messages.some(
311+
(message) => message.role === "assistant" && message.text.includes("BETA"),
312+
),
313+
180_000,
314+
);
315+
316+
assert.equal(secondThread.session?.providerThreadId, originalProviderThreadId);
317+
}),
318+
),
319+
);
320+
219321
it.live("runs multi-turn file edits and persists checkpoint diffs", () =>
220322
withHarness((harness) =>
221323
Effect.gen(function* () {

apps/server/integration/providerService.integration.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ it.effect("replays typed runtime fixture events", () =>
122122
{
123123
provider: "codex",
124124
cwd: fixture.cwd,
125+
runtimeMode: "full-access",
125126
},
126127
);
127128
assert.equal((session.threadId ?? "").length > 0, true);
@@ -155,6 +156,7 @@ it.effect("replays file-changing fixture turn events", () =>
155156
{
156157
provider: "codex",
157158
cwd: fixture.cwd,
159+
runtimeMode: "full-access",
158160
},
159161
);
160162
assert.equal((session.threadId ?? "").length > 0, true);
@@ -192,6 +194,7 @@ it.effect("runs multi-turn tool/approval flow", () =>
192194
{
193195
provider: "codex",
194196
cwd: fixture.cwd,
197+
runtimeMode: "full-access",
195198
},
196199
);
197200
assert.equal((session.threadId ?? "").length > 0, true);
@@ -244,6 +247,7 @@ it.effect("rolls back provider conversation state only", () =>
244247
{
245248
provider: "codex",
246249
cwd: fixture.cwd,
250+
runtimeMode: "full-access",
247251
},
248252
);
249253
assert.equal((session.threadId ?? "").length > 0, true);

apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ function makeSnapshot(input: {
4343
projectId: input.projectId,
4444
title: "Thread",
4545
model: "gpt-5-codex",
46+
runtimeMode: "full-access",
4647
branch: null,
4748
worktreePath: input.worktreePath,
4849
latestTurn: {

apps/server/src/codexAppServerManager.test.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
import { describe, expect, it, vi } from "vitest";
2+
import { randomUUID } from "node:crypto";
3+
import { mkdtempSync, rmSync, writeFileSync } from "node:fs";
4+
import os from "node:os";
5+
import path from "node:path";
26
import { ProviderSessionId } from "@t3tools/contracts";
37

48
import {
@@ -165,6 +169,7 @@ describe("startSession", () => {
165169
await expect(
166170
manager.startSession({
167171
provider: "codex",
172+
runtimeMode: "full-access",
168173
}),
169174
).rejects.toThrow("cwd missing");
170175
expect(events).toHaveLength(1);
@@ -347,3 +352,84 @@ describe("thread checkpoint control", () => {
347352
});
348353
});
349354
});
355+
356+
describe.skipIf(!process.env.CODEX_BINARY_PATH)("startSession live Codex resume", () => {
357+
it(
358+
"keeps prior thread history when resuming with a changed runtime mode",
359+
async () => {
360+
const workspaceDir = mkdtempSync(path.join(os.tmpdir(), "codex-live-resume-"));
361+
writeFileSync(path.join(workspaceDir, "README.md"), "hello\n", "utf8");
362+
363+
const manager = new CodexAppServerManager();
364+
365+
try {
366+
const firstSession = await manager.startSession({
367+
provider: "codex",
368+
cwd: workspaceDir,
369+
runtimeMode: "full-access",
370+
providerOptions: {
371+
codex: {
372+
binaryPath: process.env.CODEX_BINARY_PATH,
373+
...(process.env.CODEX_HOME_PATH
374+
? { homePath: process.env.CODEX_HOME_PATH }
375+
: {}),
376+
},
377+
},
378+
});
379+
380+
const firstTurn = await manager.sendTurn({
381+
sessionId: firstSession.sessionId,
382+
input: `Reply with exactly the word ALPHA ${randomUUID()}`,
383+
});
384+
385+
expect(firstTurn.threadId).toBe(firstSession.threadId);
386+
387+
await vi.waitFor(async () => {
388+
const snapshot = await manager.readThread(firstSession.sessionId);
389+
expect(snapshot.turns.length).toBeGreaterThan(0);
390+
}, { timeout: 120_000, interval: 1_000 });
391+
392+
const firstSnapshot = await manager.readThread(firstSession.sessionId);
393+
const originalThreadId = firstSnapshot.threadId;
394+
const originalTurnCount = firstSnapshot.turns.length;
395+
396+
manager.stopSession(firstSession.sessionId);
397+
398+
const resumedSession = await manager.startSession({
399+
provider: "codex",
400+
cwd: workspaceDir,
401+
runtimeMode: "approval-required",
402+
resumeCursor: firstSession.resumeCursor,
403+
providerOptions: {
404+
codex: {
405+
binaryPath: process.env.CODEX_BINARY_PATH,
406+
...(process.env.CODEX_HOME_PATH
407+
? { homePath: process.env.CODEX_HOME_PATH }
408+
: {}),
409+
},
410+
},
411+
});
412+
413+
expect(resumedSession.threadId).toBe(originalThreadId);
414+
415+
const resumedSnapshotBeforeTurn = await manager.readThread(resumedSession.sessionId);
416+
expect(resumedSnapshotBeforeTurn.threadId).toBe(originalThreadId);
417+
expect(resumedSnapshotBeforeTurn.turns.length).toBeGreaterThanOrEqual(originalTurnCount);
418+
419+
await manager.sendTurn({
420+
sessionId: resumedSession.sessionId,
421+
input: `Reply with exactly the word BETA ${randomUUID()}`,
422+
});
423+
424+
await vi.waitFor(async () => {
425+
const snapshot = await manager.readThread(resumedSession.sessionId);
426+
expect(snapshot.turns.length).toBeGreaterThan(originalTurnCount);
427+
}, { timeout: 120_000, interval: 1_000 });
428+
} finally {
429+
manager.stopAll();
430+
rmSync(workspaceDir, { recursive: true, force: true });
431+
}
432+
},
433+
180_000,
434+
);
435+
});

apps/server/src/codexAppServerManager.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ export class CodexAppServerManager extends EventEmitter<CodexAppServerManagerEve
204204
sessionId,
205205
provider: "codex",
206206
status: "connecting",
207+
runtimeMode: input.runtimeMode,
207208
model: normalizeCodexModelSlug(input.model),
208209
cwd: resolvedCwd,
209210
createdAt: now,

0 commit comments

Comments
 (0)