Skip to content

Commit b082204

Browse files
committed
refactor(agents): append text turns asynchronously
1 parent a93ce36 commit b082204

5 files changed

Lines changed: 142 additions & 47 deletions

File tree

src/agents/command/attempt-execution.cli.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,23 @@ async function readSessionMessages(sessionFile: string) {
7373
);
7474
}
7575

76+
async function readSessionFileEntries(sessionFile: string) {
77+
const raw = await fs.readFile(sessionFile, "utf-8");
78+
return raw
79+
.split(/\r?\n/)
80+
.filter(Boolean)
81+
.map(
82+
(line) =>
83+
JSON.parse(line) as {
84+
type?: string;
85+
id?: string;
86+
parentId?: string | null;
87+
cwd?: string;
88+
message?: { role?: string };
89+
},
90+
);
91+
}
92+
7693
describe("CLI attempt execution", () => {
7794
let tmpDir: string;
7895
let storePath: string;
@@ -374,6 +391,17 @@ describe("CLI attempt execution", () => {
374391

375392
const sessionFile = updatedEntry?.sessionFile;
376393
expect(sessionFile).toBeTruthy();
394+
const entries = await readSessionFileEntries(sessionFile!);
395+
expect(entries[0]).toMatchObject({
396+
type: "session",
397+
id: sessionEntry.sessionId,
398+
cwd: tmpDir,
399+
});
400+
expect(entries[1]).toMatchObject({ type: "message", parentId: null });
401+
expect(entries[2]).toMatchObject({
402+
type: "message",
403+
parentId: entries[1]?.id,
404+
});
377405
const messages = await readSessionMessages(sessionFile!);
378406
expect(messages).toHaveLength(2);
379407
expect(messages[0]).toMatchObject({

src/agents/command/attempt-execution.ts

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import fs from "node:fs/promises";
2-
import { SessionManager } from "@mariozechner/pi-coding-agent";
31
import { normalizeReplyPayload } from "../../auto-reply/reply/normalize-reply.js";
42
import type { ThinkLevel, VerboseLevel } from "../../auto-reply/thinking.js";
3+
import { appendSessionTranscriptMessage } from "../../config/sessions/transcript-append.js";
54
import { resolveSessionTranscriptFile } from "../../config/sessions/transcript.js";
65
import type { SessionEntry } from "../../config/sessions/types.js";
76
import type { OpenClawConfig } from "../../config/types.openclaw.js";
@@ -20,9 +19,9 @@ import { FailoverError } from "../failover-error.js";
2019
import { resolveAgentHarnessPolicy } from "../harness/selection.js";
2120
import { isCliRuntimeAlias, resolveCliRuntimeExecutionProvider } from "../model-runtime-aliases.js";
2221
import { isCliProvider } from "../model-selection.js";
23-
import { prepareSessionManagerForRun } from "../pi-embedded-runner/session-manager-init.js";
2422
import { runEmbeddedPiAgent, type EmbeddedPiRunResult } from "../pi-embedded.js";
2523
import { buildAgentRuntimeAuthPlan } from "../runtime-plan/auth.js";
24+
import { acquireSessionWriteLock } from "../session-write-lock.js";
2625
import { buildWorkspaceSkillSnapshot } from "../skills.js";
2726
import { buildUsageWithNoCost } from "../stream-message-shared.js";
2827
import {
@@ -194,38 +193,44 @@ async function persistTextTurnTranscript(
194193
agentId: params.sessionAgentId,
195194
threadId: params.threadId,
196195
});
197-
const hadSessionFile = await fs
198-
.access(sessionFile)
199-
.then(() => true)
200-
.catch(() => false);
201-
const sessionManager = SessionManager.open(sessionFile);
202-
await prepareSessionManagerForRun({
203-
sessionManager,
196+
const lock = await acquireSessionWriteLock({
204197
sessionFile,
205-
hadSessionFile,
206-
sessionId: params.sessionId,
207-
cwd: params.sessionCwd,
198+
timeoutMs: 10_000,
199+
allowReentrant: true,
208200
});
201+
try {
202+
if (promptText) {
203+
await appendSessionTranscriptMessage({
204+
transcriptPath: sessionFile,
205+
sessionId: params.sessionId,
206+
cwd: params.sessionCwd,
207+
message: {
208+
role: "user",
209+
content: promptText,
210+
timestamp: Date.now(),
211+
},
212+
});
213+
}
209214

210-
if (promptText) {
211-
sessionManager.appendMessage({
212-
role: "user",
213-
content: promptText,
214-
timestamp: Date.now(),
215-
});
216-
}
217-
218-
if (replyText) {
219-
sessionManager.appendMessage({
220-
role: "assistant",
221-
content: [{ type: "text", text: replyText }],
222-
api: params.assistant.api,
223-
provider: params.assistant.provider,
224-
model: params.assistant.model,
225-
usage: resolveTranscriptUsage(params.assistant.usage),
226-
stopReason: "stop",
227-
timestamp: Date.now(),
228-
});
215+
if (replyText) {
216+
await appendSessionTranscriptMessage({
217+
transcriptPath: sessionFile,
218+
sessionId: params.sessionId,
219+
cwd: params.sessionCwd,
220+
message: {
221+
role: "assistant",
222+
content: [{ type: "text", text: replyText }],
223+
api: params.assistant.api,
224+
provider: params.assistant.provider,
225+
model: params.assistant.model,
226+
usage: resolveTranscriptUsage(params.assistant.usage),
227+
stopReason: "stop",
228+
timestamp: Date.now(),
229+
},
230+
});
231+
}
232+
} finally {
233+
await lock.release();
229234
}
230235

231236
emitSessionTranscriptUpdate(sessionFile);

src/agents/openclaw-tools.sessions.test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,8 @@ describe("sessions tools", () => {
299299
params: {
300300
activeMinutes: undefined,
301301
agentId: "main",
302+
includeDerivedTitles: false,
303+
includeLastMessage: false,
302304
includeGlobal: true,
303305
includeUnknown: true,
304306
label: "mailbox",
@@ -382,8 +384,8 @@ describe("sessions tools", () => {
382384
callGatewayMock.mockImplementation(async (opts: unknown) => {
383385
const request = opts as { method?: string; params?: Record<string, unknown> };
384386
if (request.method === "sessions.list") {
385-
expect(request.params?.includeDerivedTitles).toBeUndefined();
386-
expect(request.params?.includeLastMessage).toBeUndefined();
387+
expect(request.params?.includeDerivedTitles).toBe(false);
388+
expect(request.params?.includeLastMessage).toBe(false);
387389
return {
388390
path: storePath,
389391
sessions: [

src/agents/tools/sessions-list-tool.ts

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ import {
66
resolveSessionFilePathOptions,
77
resolveStorePath,
88
} from "../../config/sessions.js";
9+
import type { SessionEntry } from "../../config/sessions/types.js";
910
import type { OpenClawConfig } from "../../config/types.openclaw.js";
1011
import { callGateway } from "../../gateway/call.js";
11-
import { deriveSessionTitle } from "../../gateway/session-utils.js";
12+
import {
13+
deriveSessionTitle,
14+
readSessionTitleFieldsFromTranscriptAsync,
15+
} from "../../gateway/session-utils.js";
1216
import { resolveAgentIdFromSessionKey } from "../../routing/session-key.js";
1317
import { normalizeOptionalLowercaseString, readStringValue } from "../../shared/string-coerce.js";
1418
import {
@@ -45,6 +49,8 @@ const SessionsListToolSchema = Type.Object({
4549

4650
type GatewayCaller = typeof callGateway;
4751

52+
const SESSIONS_LIST_TRANSCRIPT_FIELD_ROWS = 100;
53+
4854
function readSessionRunStatus(value: unknown): SessionRunStatus | undefined {
4955
return value === "running" ||
5056
value === "done" ||
@@ -109,6 +115,8 @@ export function createSessionsListTool(opts?: {
109115
const includeDerivedTitles = params.includeDerivedTitles === true;
110116
const includeLastMessage = params.includeLastMessage === true;
111117
const gatewayCall = opts?.callGateway ?? callGateway;
118+
const a2aPolicy = createAgentToAgentPolicy(cfg);
119+
const hydrateTranscriptFieldsAfterFiltering = includeDerivedTitles || includeLastMessage;
112120

113121
const list = await gatewayCall<{ sessions: Array<SessionListRow>; path: string }>({
114122
method: "sessions.list",
@@ -118,8 +126,8 @@ export function createSessionsListTool(opts?: {
118126
label,
119127
agentId,
120128
search,
121-
includeDerivedTitles,
122-
includeLastMessage,
129+
includeDerivedTitles: false,
130+
includeLastMessage: false,
123131
includeGlobal: !restrictToSpawned,
124132
includeUnknown: !restrictToSpawned,
125133
spawnedBy: restrictToSpawned ? effectiveRequesterKey : undefined,
@@ -128,7 +136,6 @@ export function createSessionsListTool(opts?: {
128136

129137
const sessions = Array.isArray(list?.sessions) ? list.sessions : [];
130138
const storePath = typeof list?.path === "string" ? list.path : undefined;
131-
const a2aPolicy = createAgentToAgentPolicy(cfg);
132139
const visibilityGuard = await createSessionVisibilityGuard({
133140
action: "list",
134141
requesterSessionKey: effectiveRequesterKey,
@@ -137,6 +144,13 @@ export function createSessionsListTool(opts?: {
137144
});
138145
const rows: SessionListRow[] = [];
139146
const historyTargets: Array<{ row: SessionListRow; resolvedKey: string }> = [];
147+
const titleTargets: Array<{
148+
row: SessionListRow;
149+
titleEntry: SessionEntry;
150+
sessionId: string;
151+
sessionFile?: string;
152+
agentId: string;
153+
}> = [];
140154

141155
for (const entry of sessions) {
142156
if (!entry || typeof entry !== "object") {
@@ -310,17 +324,24 @@ export function createSessionsListTool(opts?: {
310324
lastAccountId,
311325
transcriptPath,
312326
};
313-
if (sessionId && includeDerivedTitles && !row.derivedTitle) {
314-
row.derivedTitle = deriveSessionTitle(
315-
{
327+
if (
328+
sessionId &&
329+
hydrateTranscriptFieldsAfterFiltering &&
330+
titleTargets.length < SESSIONS_LIST_TRANSCRIPT_FIELD_ROWS
331+
) {
332+
titleTargets.push({
333+
row,
334+
titleEntry: {
316335
sessionId,
317336
displayName: row.displayName,
318337
label: row.label,
319338
subject: readStringValue((entry as { subject?: unknown }).subject),
320339
updatedAt: typeof row.updatedAt === "number" ? row.updatedAt : 0,
321340
},
322-
undefined,
323-
);
341+
sessionId,
342+
...(sessionFile ? { sessionFile } : {}),
343+
agentId: resolvedAgentId,
344+
});
324345
}
325346
if (messageLimit > 0) {
326347
const resolvedKey = resolveInternalSessionKey({
@@ -333,6 +354,37 @@ export function createSessionsListTool(opts?: {
333354
rows.push(row);
334355
}
335356

357+
if (titleTargets.length > 0) {
358+
const maxConcurrent = Math.min(4, titleTargets.length);
359+
let index = 0;
360+
const worker = async () => {
361+
while (true) {
362+
const next = index;
363+
index += 1;
364+
if (next >= titleTargets.length) {
365+
return;
366+
}
367+
const target = titleTargets[next];
368+
const fields = await readSessionTitleFieldsFromTranscriptAsync(
369+
target.sessionId,
370+
storePath,
371+
target.sessionFile,
372+
target.agentId,
373+
);
374+
if (includeDerivedTitles && !target.row.derivedTitle) {
375+
target.row.derivedTitle = deriveSessionTitle(
376+
target.titleEntry,
377+
fields.firstUserMessage,
378+
);
379+
}
380+
if (includeLastMessage && fields.lastMessagePreview) {
381+
target.row.lastMessagePreview = fields.lastMessagePreview;
382+
}
383+
}
384+
};
385+
await Promise.all(Array.from({ length: maxConcurrent }, () => worker()));
386+
}
387+
336388
if (messageLimit > 0 && historyTargets.length > 0) {
337389
const maxConcurrent = Math.min(4, historyTargets.length);
338390
let index = 0;

src/config/sessions/transcript-append.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,10 @@ async function migrateLinearTranscriptToParentLinked(transcriptPath: string): Pr
158158
return result;
159159
}
160160

161-
async function ensureTranscriptHeader(transcriptPath: string): Promise<void> {
161+
async function ensureTranscriptHeader(
162+
transcriptPath: string,
163+
params: { sessionId?: string; cwd?: string } = {},
164+
): Promise<void> {
162165
const stat = await fs.stat(transcriptPath).catch(() => null);
163166
if (stat?.isFile() && stat.size > 0) {
164167
return;
@@ -167,9 +170,9 @@ async function ensureTranscriptHeader(transcriptPath: string): Promise<void> {
167170
const header = {
168171
type: "session",
169172
version: CURRENT_SESSION_VERSION,
170-
id: randomUUID(),
173+
id: params.sessionId ?? randomUUID(),
171174
timestamp: new Date().toISOString(),
172-
cwd: process.cwd(),
175+
cwd: params.cwd ?? process.cwd(),
173176
};
174177
await fs.writeFile(transcriptPath, `${JSON.stringify(header)}\n`, {
175178
encoding: "utf-8",
@@ -182,6 +185,8 @@ export async function appendSessionTranscriptMessage(params: {
182185
transcriptPath: string;
183186
message: unknown;
184187
now?: number;
188+
sessionId?: string;
189+
cwd?: string;
185190
useRawWhenLinear?: boolean;
186191
}): Promise<{ messageId: string }> {
187192
const lock = await acquireSessionWriteLock({
@@ -192,7 +197,10 @@ export async function appendSessionTranscriptMessage(params: {
192197
try {
193198
const now = params.now ?? Date.now();
194199
const messageId = randomUUID();
195-
await ensureTranscriptHeader(params.transcriptPath);
200+
await ensureTranscriptHeader(params.transcriptPath, {
201+
...(params.sessionId ? { sessionId: params.sessionId } : {}),
202+
...(params.cwd ? { cwd: params.cwd } : {}),
203+
});
196204
const stat = await fs.stat(params.transcriptPath).catch(() => null);
197205
let leafInfo: TranscriptLeafInfo = await readTranscriptLeafInfo(params.transcriptPath).catch(
198206
() => ({

0 commit comments

Comments
 (0)