Skip to content

Commit f2405d0

Browse files
committed
fix(codex): unsubscribe bound threads after turns
1 parent 081e295 commit f2405d0

2 files changed

Lines changed: 132 additions & 10 deletions

File tree

extensions/codex/src/conversation-binding.test.ts

Lines changed: 103 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,9 @@ describe("codex conversation binding", () => {
524524
});
525525
return { turn: { id: "turn-new" } };
526526
}
527+
if (method === "thread/unsubscribe") {
528+
return {};
529+
}
527530
throw new Error(`unexpected method: ${method}`);
528531
}),
529532
addNotificationHandler: vi.fn((handler) => {
@@ -565,21 +568,23 @@ describe("codex conversation binding", () => {
565568
expect(result).toEqual({ handled: true, reply: { text: "Recovered" } });
566569
expect(requests.map((request) => request.method)).toEqual([
567570
"turn/start",
571+
"thread/unsubscribe",
568572
"thread/start",
569573
"turn/start",
574+
"thread/unsubscribe",
570575
]);
571576
const sharedClientParams = mockCallArg(sharedClientMocks.getSharedCodexAppServerClient) as {
572577
authProfileId?: unknown;
573578
};
574579
expect(sharedClientParams?.authProfileId).toBe("work");
575-
expect(requests[1]?.params.model).toBe("gpt-5.4-mini");
576-
expect(requests[1]?.params.approvalPolicy).toBe("on-request");
577-
expect(requests[1]?.params.sandbox).toBe("workspace-write");
578-
expect(requests[1]?.params.serviceTier).toBe("priority");
579-
expect(requests[1]?.params).not.toHaveProperty("modelProvider");
580-
expect(requests[2]?.params.threadId).toBe("thread-new");
580+
expect(requests[2]?.params.model).toBe("gpt-5.4-mini");
581581
expect(requests[2]?.params.approvalPolicy).toBe("on-request");
582+
expect(requests[2]?.params.sandbox).toBe("workspace-write");
582583
expect(requests[2]?.params.serviceTier).toBe("priority");
584+
expect(requests[2]?.params).not.toHaveProperty("modelProvider");
585+
expect(requests[3]?.params.threadId).toBe("thread-new");
586+
expect(requests[3]?.params.approvalPolicy).toBe("on-request");
587+
expect(requests[3]?.params.serviceTier).toBe("priority");
583588
const savedBinding = JSON.parse(
584589
await fs.readFile(`${sessionFile}.codex-app-server.json`, "utf8"),
585590
);
@@ -622,6 +627,9 @@ describe("codex conversation binding", () => {
622627
});
623628
return { turn: { id: "turn-new" } };
624629
}
630+
if (method === "thread/unsubscribe") {
631+
return {};
632+
}
625633
throw new Error(`unexpected method: ${method}`);
626634
}),
627635
addNotificationHandler: vi.fn((handler) => {
@@ -661,9 +669,97 @@ describe("codex conversation binding", () => {
661669
);
662670

663671
expect(result).toEqual({ handled: true, reply: { text: "Recovered fresh" } });
664-
expect(requests.map((request) => request.method)).toEqual(["thread/start", "turn/start"]);
672+
expect(requests.map((request) => request.method)).toEqual([
673+
"thread/start",
674+
"turn/start",
675+
"thread/unsubscribe",
676+
]);
665677
expect(requests[1]?.params.threadId).toBe("thread-new");
666678
expect(requests[1]?.params.personality).toBe("none");
679+
expect(requests[2]?.params.threadId).toBe("thread-new");
680+
const savedBinding = JSON.parse(
681+
await fs.readFile(`${sessionFile}.codex-app-server.json`, "utf8"),
682+
);
683+
expect(savedBinding.threadId).toBe("thread-new");
684+
});
685+
686+
it("unsubscribes the recovered bound thread when retry turn start rejects", async () => {
687+
const sessionFile = path.join(tempDir, "session.jsonl");
688+
await fs.writeFile(
689+
`${sessionFile}.codex-app-server.json`,
690+
JSON.stringify({
691+
schemaVersion: 1,
692+
threadId: "thread-old",
693+
cwd: tempDir,
694+
}),
695+
);
696+
const requests: Array<{ method: string; params: Record<string, unknown> }> = [];
697+
sharedClientMocks.getSharedCodexAppServerClient.mockResolvedValue({
698+
request: vi.fn(async (method: string, requestParams: Record<string, unknown>) => {
699+
requests.push({ method, params: requestParams });
700+
if (method === "turn/start" && requestParams.threadId === "thread-old") {
701+
throw new Error("thread not found: thread-old");
702+
}
703+
if (method === "thread/start") {
704+
return {
705+
thread: { id: "thread-new", sessionId: "session-1", cwd: tempDir },
706+
model: "gpt-5.4-mini",
707+
};
708+
}
709+
if (method === "turn/start" && requestParams.threadId === "thread-new") {
710+
throw new Error("retry failed after recovery");
711+
}
712+
if (method === "thread/unsubscribe") {
713+
return {};
714+
}
715+
throw new Error(`unexpected method: ${method}`);
716+
}),
717+
addNotificationHandler: vi.fn(() => () => undefined),
718+
addRequestHandler: vi.fn(() => () => undefined),
719+
});
720+
721+
const result = await handleCodexConversationInboundClaim(
722+
{
723+
content: "hi again",
724+
bodyForAgent: "hi again",
725+
channel: "telegram",
726+
isGroup: false,
727+
commandAuthorized: true,
728+
},
729+
{
730+
channelId: "telegram",
731+
pluginBinding: {
732+
bindingId: "binding-1",
733+
pluginId: "codex",
734+
pluginRoot: tempDir,
735+
channel: "telegram",
736+
accountId: "default",
737+
conversationId: "5185575566",
738+
boundAt: Date.now(),
739+
data: {
740+
kind: "codex-app-server-session",
741+
version: 1,
742+
sessionFile,
743+
workspaceDir: tempDir,
744+
},
745+
},
746+
},
747+
{ timeoutMs: 50 },
748+
);
749+
750+
expect(result).toEqual({
751+
handled: true,
752+
reply: { text: "Codex app-server turn failed: retry failed after recovery" },
753+
});
754+
expect(requests.map((request) => request.method)).toEqual([
755+
"turn/start",
756+
"thread/unsubscribe",
757+
"thread/start",
758+
"turn/start",
759+
"thread/unsubscribe",
760+
]);
761+
expect(requests[1]?.params.threadId).toBe("thread-old");
762+
expect(requests[4]?.params.threadId).toBe("thread-new");
667763
const savedBinding = JSON.parse(
668764
await fs.readFile(`${sessionFile}.codex-app-server.json`, "utf8"),
669765
);

extensions/codex/src/conversation-binding.ts

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { formatErrorMessage } from "openclaw/plugin-sdk/agent-harness-runtime";
1+
import { embeddedAgentLog, formatErrorMessage } from "openclaw/plugin-sdk/agent-harness-runtime";
22
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
33
import type {
44
PluginConversationBindingResolvedEvent,
@@ -49,6 +49,7 @@ import { buildCodexConversationTurnInput } from "./conversation-turn-input.js";
4949
import { resumeCodexCliSessionOnNode } from "./node-cli-sessions.js";
5050

5151
const DEFAULT_BOUND_TURN_TIMEOUT_MS = 20 * 60_000;
52+
const BOUND_TURN_UNSUBSCRIBE_TIMEOUT_MS = 5_000;
5253

5354
export {
5455
createCodexCliNodeConversationBindingData,
@@ -496,8 +497,33 @@ async function runBoundTurn(params: {
496497
},
497498
};
498499
} finally {
499-
notificationCleanup();
500-
requestCleanup();
500+
try {
501+
await unsubscribeBoundCodexThreadBestEffort(client, {
502+
threadId,
503+
timeoutMs: BOUND_TURN_UNSUBSCRIBE_TIMEOUT_MS,
504+
});
505+
} finally {
506+
notificationCleanup();
507+
requestCleanup();
508+
}
509+
}
510+
}
511+
512+
async function unsubscribeBoundCodexThreadBestEffort(
513+
client: Awaited<ReturnType<typeof getSharedCodexAppServerClient>>,
514+
params: { threadId: string; timeoutMs: number },
515+
): Promise<void> {
516+
try {
517+
await client.request(
518+
"thread/unsubscribe",
519+
{ threadId: params.threadId },
520+
{ timeoutMs: params.timeoutMs },
521+
);
522+
} catch (error) {
523+
embeddedAgentLog.debug("codex conversation bound thread unsubscribe cleanup failed", {
524+
threadId: params.threadId,
525+
error,
526+
});
501527
}
502528
}
503529

0 commit comments

Comments
 (0)