Skip to content

Commit 119a01c

Browse files
authored
fix(webchat): stabilize live transcript run state
Stabilize WebChat transcript/run-state truth for Codex and selected-session observers. Summary: - Mirror Codex inbound prompts at turn start without duplicating suppressed persisted prompts. - Deliver hidden external-channel live chat/tool/agent updates only to exact selected-session subscribers. - Repair Control UI selected-session subscription state, alias-aware run adoption, and accumulated stream dedupe. - Add focused Codex, gateway/session-event, and Control UI regression coverage. Verification: - Current-head CI: 101 green, 0 pending; stale canceled entries are superseded automation from prior force-pushed heads. - Local focused Vitest shards passed: Codex app-server 2 files / 233 tests, gateway/session 4 files / 116 tests, UI 7 files / 238 tests. - `node scripts/run-tsgo.mjs -p test/tsconfig/tsconfig.core.test.json --incremental --tsBuildInfoFile .artifacts/tsgo-cache/core-test.tsbuildinfo` - `node --import tsx scripts/check-no-extension-test-core-imports.ts` - `git diff --check origin/main..HEAD` Closes #83528. Closes #82611. Refs #83949.
1 parent 95d1b39 commit 119a01c

24 files changed

Lines changed: 798 additions & 54 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ Docs: https://docs.openclaw.ai
434434

435435
### Fixes
436436

437+
- Control UI/WebChat: keep selected external-channel sessions live by mirroring Codex prompts at turn start, streaming hidden runs only to exact selected-session subscribers, and deduplicating accumulated stream snapshots around tool cards. Fixes #83528, #82611, refs #83949. Thanks @BunsDev.
437438
- CLI/tasks: include stale-running task maintenance decisions in `openclaw tasks maintenance --json` so retained and reconcile candidates explain backing-session, cron, CLI, and wedged-subagent state. (#84691) Thanks @efpiva.
438439
- Codex app-server: keep system-prompt reports working when bootstrap hooks provide workspace files with only a path and content, so hook-supplied SOUL/IDENTITY/TOOLS/USER context still reports injected characters correctly. (#84736) Thanks @JARVIS-Glasses.
439440
- Providers/MiniMax music: stop advertising `durationSeconds` control and remove prompt-injected duration hints, so `music_generate` reports MiniMax duration as an unsupported override instead of suggesting MiniMax can enforce track length. Fixes #84508. Thanks @neeravmakwana.

extensions/codex/src/app-server/run-attempt.test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2407,6 +2407,74 @@ describe("runCodexAppServerAttempt", () => {
24072407
]);
24082408
});
24092409

2410+
it("mirrors the Codex prompt into the transcript when the turn starts", async () => {
2411+
const sessionFile = path.join(tempDir, "session-early-prompt.jsonl");
2412+
const workspaceDir = path.join(tempDir, "workspace-early-prompt");
2413+
const harness = createStartedThreadHarness();
2414+
const params = createParams(sessionFile, workspaceDir);
2415+
params.prompt = "external channel prompt";
2416+
2417+
const run = runCodexAppServerAttempt(params);
2418+
await harness.waitForMethod("turn/start");
2419+
await vi.waitFor(async () => {
2420+
const raw = await fs.readFile(sessionFile, "utf8");
2421+
expect(raw).toContain('"role":"user"');
2422+
expect(raw).toContain('"content":"external channel prompt"');
2423+
expect(raw).toContain('"idempotencyKey":"codex-app-server:thread-1:turn-1:prompt"');
2424+
});
2425+
2426+
const rawBeforeCompletion = await fs.readFile(sessionFile, "utf8");
2427+
expect(rawBeforeCompletion).not.toContain('"role":"assistant"');
2428+
2429+
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
2430+
await run;
2431+
2432+
const rawAfterCompletion = await fs.readFile(sessionFile, "utf8");
2433+
expect(rawAfterCompletion.match(/"role":"user"/gu)).toHaveLength(1);
2434+
});
2435+
2436+
it("does not mirror the Codex prompt early when user message persistence is suppressed", async () => {
2437+
const sessionFile = path.join(tempDir, "session-suppressed-early-prompt.jsonl");
2438+
const workspaceDir = path.join(tempDir, "workspace-suppressed-early-prompt");
2439+
const harness = createStartedThreadHarness();
2440+
const params = createParams(sessionFile, workspaceDir);
2441+
params.prompt = "already persisted prompt";
2442+
params.suppressNextUserMessagePersistence = true;
2443+
const readTranscript = async () =>
2444+
fs.readFile(sessionFile, "utf8").catch((error) => {
2445+
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
2446+
return "";
2447+
}
2448+
throw error;
2449+
});
2450+
2451+
const run = runCodexAppServerAttempt(params);
2452+
await harness.waitForMethod("turn/start");
2453+
await expect(
2454+
vi.waitFor(
2455+
async () => {
2456+
const raw = await readTranscript();
2457+
expect(raw).toContain("already persisted prompt");
2458+
},
2459+
{ interval: 1, timeout: 100 },
2460+
),
2461+
).rejects.toThrow();
2462+
const rawBeforeCompletion = await readTranscript();
2463+
expect(rawBeforeCompletion).not.toContain("already persisted prompt");
2464+
expect(rawBeforeCompletion).not.toContain(
2465+
'"idempotencyKey":"codex-app-server:thread-1:turn-1:prompt"',
2466+
);
2467+
2468+
await harness.completeTurn({ threadId: "thread-1", turnId: "turn-1" });
2469+
await run;
2470+
2471+
const rawAfterCompletion = await readTranscript();
2472+
expect(rawAfterCompletion).not.toContain("already persisted prompt");
2473+
expect(rawAfterCompletion).not.toContain(
2474+
'"idempotencyKey":"codex-app-server:thread-1:turn-1:prompt"',
2475+
);
2476+
});
2477+
24102478
it("accepts turn completions scoped by nested turn thread id", async () => {
24112479
const harness = createStartedThreadHarness();
24122480
const params = createParams(

extensions/codex/src/app-server/run-attempt.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ import {
195195
recordCodexTrajectoryContext,
196196
} from "./trajectory.js";
197197
import {
198+
attachCodexMirrorIdentity,
198199
buildCodexUserPromptMessage,
199200
mirrorCodexAppServerTranscript,
200201
} from "./transcript-mirror.js";
@@ -3147,6 +3148,13 @@ export async function runCodexAppServerAttempt(
31473148
abort: () => runAbortController.abort("aborted"),
31483149
};
31493150
setActiveEmbeddedRun(params.sessionId, handle, params.sessionKey);
3151+
void mirrorPromptAtTurnStartBestEffort({
3152+
params,
3153+
agentId: sessionAgentId,
3154+
sessionKey: sandboxSessionKey,
3155+
threadId: thread.threadId,
3156+
turnId: activeTurnId,
3157+
});
31503158
turnAttemptIdleWatchArmed = true;
31513159
turnTerminalIdleWatchArmed = true;
31523160
touchTurnCompletionActivity("turn:start", { attemptProgress: true });
@@ -5717,6 +5725,35 @@ async function mirrorTranscriptBestEffort(params: {
57175725
}
57185726
}
57195727

5728+
async function mirrorPromptAtTurnStartBestEffort(params: {
5729+
params: EmbeddedRunAttemptParams;
5730+
agentId?: string;
5731+
sessionKey?: string;
5732+
threadId: string;
5733+
turnId: string;
5734+
}): Promise<void> {
5735+
if (params.params.suppressNextUserMessagePersistence) {
5736+
return;
5737+
}
5738+
try {
5739+
await mirrorCodexAppServerTranscript({
5740+
sessionFile: params.params.sessionFile,
5741+
agentId: params.agentId,
5742+
sessionKey: params.sessionKey,
5743+
messages: [
5744+
attachCodexMirrorIdentity(
5745+
buildCodexUserPromptMessage(params.params),
5746+
`${params.turnId}:prompt`,
5747+
),
5748+
],
5749+
idempotencyScope: `codex-app-server:${params.threadId}`,
5750+
config: params.params.config,
5751+
});
5752+
} catch (error) {
5753+
embeddedAgentLog.warn("failed to mirror codex app-server prompt at turn start", { error });
5754+
}
5755+
}
5756+
57205757
function isNonEmptyString(value: unknown): value is string {
57215758
return typeof value === "string" && value.length > 0;
57225759
}

extensions/codex/src/app-server/transcript-mirror.test.ts

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,19 @@ import {
1313
makeAgentAssistantMessage,
1414
makeAgentUserMessage,
1515
} from "openclaw/plugin-sdk/test-fixtures";
16-
import { afterEach, describe, expect, it } from "vitest";
16+
import { afterEach, describe, expect, it, vi } from "vitest";
1717
import { attachCodexMirrorIdentity, mirrorCodexAppServerTranscript } from "./transcript-mirror.js";
1818

19+
const emitSessionTranscriptUpdateMock = vi.hoisted(() => vi.fn());
20+
21+
vi.mock("openclaw/plugin-sdk/agent-harness-runtime", async (importOriginal) => {
22+
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/agent-harness-runtime")>();
23+
return {
24+
...actual,
25+
emitSessionTranscriptUpdate: emitSessionTranscriptUpdateMock,
26+
};
27+
});
28+
1929
type MirroredAgentMessage = Extract<AgentMessage, { role: "user" | "assistant" | "toolResult" }>;
2030

2131
// Mirrors transcript-mirror.ts's fallback fingerprint exactly so test
@@ -29,6 +39,7 @@ const tempDirs: string[] = [];
2939

3040
afterEach(async () => {
3141
resetGlobalHookRunner();
42+
emitSessionTranscriptUpdateMock.mockReset();
3243
for (const dir of tempDirs.splice(0)) {
3344
await fs.rm(dir, { recursive: true, force: true });
3445
}
@@ -105,6 +116,79 @@ describe("mirrorCodexAppServerTranscript", () => {
105116
);
106117
});
107118

119+
it("emits message-bearing updates for newly appended mirrored messages only", async () => {
120+
const sessionFile = await createTempSessionFile();
121+
const userMessage = attachCodexMirrorIdentity(
122+
makeAgentUserMessage({
123+
content: [{ type: "text", text: "show me live" }],
124+
timestamp: Date.now(),
125+
}),
126+
"turn-1:prompt",
127+
);
128+
129+
await mirrorCodexAppServerTranscript({
130+
sessionFile,
131+
sessionKey: "agent:main:main",
132+
messages: [userMessage],
133+
idempotencyScope: "codex-app-server:thread-1",
134+
});
135+
await mirrorCodexAppServerTranscript({
136+
sessionFile,
137+
sessionKey: "agent:main:main",
138+
messages: [userMessage],
139+
idempotencyScope: "codex-app-server:thread-1",
140+
});
141+
142+
const updates = emitSessionTranscriptUpdateMock.mock.calls.map(
143+
([update]) => update as Record<string, unknown>,
144+
);
145+
expect(updates).toHaveLength(1);
146+
expect(updates[0]?.sessionFile).toBe(sessionFile);
147+
expect(updates[0]?.sessionKey).toBe("agent:main:main");
148+
expect(updates[0]?.messageId).toEqual(expect.any(String));
149+
expect(updates[0]?.message).toMatchObject({
150+
role: "user",
151+
content: [{ type: "text", text: "show me live" }],
152+
idempotencyKey: "codex-app-server:thread-1:turn-1:prompt",
153+
});
154+
expect(updates[0]?.messageSeq).toBe(1);
155+
});
156+
157+
it("emits stable sequence numbers for multi-message mirror batches", async () => {
158+
const sessionFile = await createTempSessionFile();
159+
160+
await mirrorCodexAppServerTranscript({
161+
sessionFile,
162+
sessionKey: "agent:main:main",
163+
messages: [
164+
attachCodexMirrorIdentity(
165+
makeAgentUserMessage({
166+
content: [{ type: "text", text: "first" }],
167+
timestamp: Date.now(),
168+
}),
169+
"turn-1:prompt",
170+
),
171+
attachCodexMirrorIdentity(
172+
makeAgentAssistantMessage({
173+
content: [{ type: "text", text: "second" }],
174+
timestamp: Date.now() + 1,
175+
}),
176+
"turn-1:assistant",
177+
),
178+
],
179+
idempotencyScope: "codex-app-server:thread-1",
180+
});
181+
182+
const updates = emitSessionTranscriptUpdateMock.mock.calls.map(
183+
([update]) => update as Record<string, unknown>,
184+
);
185+
expect(updates.map((update) => update.messageSeq)).toEqual([1, 2]);
186+
expect(updates.map((update) => (update.message as { role?: string }).role)).toEqual([
187+
"user",
188+
"assistant",
189+
]);
190+
});
191+
108192
it("creates the transcript directory on first mirror", async () => {
109193
const root = await makeRoot("openclaw-codex-transcript-missing-dir-");
110194
const sessionFile = path.join(root, "nested", "sessions", "session.jsonl");

extensions/codex/src/app-server/transcript-mirror.ts

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,17 @@ export async function mirrorCodexAppServerTranscript(params: {
130130
sessionFile: params.sessionFile,
131131
...resolveSessionWriteLockOptions(params.config),
132132
});
133+
const appendedUpdates: Array<{ messageId: string; message: AgentMessage; messageSeq: number }> =
134+
[];
133135
try {
134-
const existingIdempotencyKeys = await readTranscriptIdempotencyKeys(params.sessionFile);
136+
const mirrorState = await readTranscriptMirrorState(params.sessionFile);
137+
let nextMessageSeq = mirrorState.messageCount;
135138
for (const message of messages) {
136139
const dedupeIdentity = buildMirrorDedupeIdentity(message);
137140
const idempotencyKey = params.idempotencyScope
138141
? `${params.idempotencyScope}:${dedupeIdentity}`
139142
: undefined;
140-
if (idempotencyKey && existingIdempotencyKeys.has(idempotencyKey)) {
143+
if (idempotencyKey && mirrorState.idempotencyKeys.has(idempotencyKey)) {
141144
continue;
142145
}
143146
const transcriptMessage = {
@@ -160,49 +163,61 @@ export async function mirrorCodexAppServerTranscript(params: {
160163
}
161164
: nextMessage
162165
) as AgentMessage;
163-
await appendSessionTranscriptMessage({
166+
const { messageId, message: appendedMessage } = await appendSessionTranscriptMessage({
164167
transcriptPath: params.sessionFile,
165168
message: messageToAppend,
166169
config: params.config,
167170
});
171+
nextMessageSeq += 1;
172+
appendedUpdates.push({ messageId, message: appendedMessage, messageSeq: nextMessageSeq });
168173
if (idempotencyKey) {
169-
existingIdempotencyKeys.add(idempotencyKey);
174+
mirrorState.idempotencyKeys.add(idempotencyKey);
170175
}
171176
}
172177
} finally {
173178
await lock.release();
174179
}
175180

176-
if (params.sessionKey) {
177-
emitSessionTranscriptUpdate({ sessionFile: params.sessionFile, sessionKey: params.sessionKey });
178-
} else {
179-
emitSessionTranscriptUpdate(params.sessionFile);
181+
for (const update of appendedUpdates) {
182+
emitSessionTranscriptUpdate({
183+
sessionFile: params.sessionFile,
184+
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
185+
message: update.message,
186+
messageId: update.messageId,
187+
messageSeq: update.messageSeq,
188+
});
180189
}
181190
}
182191

183-
async function readTranscriptIdempotencyKeys(sessionFile: string): Promise<Set<string>> {
184-
const keys = new Set<string>();
192+
async function readTranscriptMirrorState(
193+
sessionFile: string,
194+
): Promise<{ idempotencyKeys: Set<string>; messageCount: number }> {
195+
const idempotencyKeys = new Set<string>();
196+
let messageCount = 0;
185197
let raw: string;
186198
try {
187199
raw = await fs.readFile(sessionFile, "utf8");
188200
} catch (error) {
189201
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
190202
throw error;
191203
}
192-
return keys;
204+
return { idempotencyKeys, messageCount };
193205
}
194206
for (const line of raw.split(/\r?\n/)) {
195207
if (!line.trim()) {
196208
continue;
197209
}
198210
try {
199211
const parsed = JSON.parse(line) as { message?: { idempotencyKey?: unknown } };
212+
if ((parsed as { type?: unknown }).type === "message") {
213+
messageCount += 1;
214+
}
200215
if (typeof parsed.message?.idempotencyKey === "string") {
201-
keys.add(parsed.message.idempotencyKey);
216+
idempotencyKeys.add(parsed.message.idempotencyKey);
202217
}
203218
} catch {
204219
continue;
205220
}
206221
}
207-
return keys;
222+
return { idempotencyKeys, messageCount };
208223
}

0 commit comments

Comments
 (0)