Skip to content

Commit ee0a371

Browse files
CaptainTimonCaptainTimon
authored andcommitted
fix(agents): dedupe transcript rewrite replay
1 parent a1ac559 commit ee0a371

3 files changed

Lines changed: 238 additions & 17 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ Docs: https://docs.openclaw.ai
160160

161161
### Fixes
162162

163+
- Agents/transcript: dedupe exact replayed transcript entries during overflow-recovery rewrites so cloned user prompts, compactions, and bootstrap context records do not keep amplifying session JSONL growth. Refs #69208 and fixes #66443. Thanks @BradGroux.
163164
- Gateway/watch: leave `OPENCLAW_TRACE_SYNC_IO` disabled by default in `pnpm gateway:watch:raw` so watch mode avoids noisy Node sync-I/O stack traces unless explicitly requested.
164165
- Providers: preserve non-OK `text/event-stream` response bodies so provider HTTP errors keep their JSON detail instead of collapsing to generic streaming failures. Fixes #78180.
165166
- Gateway/auth: make explicit `trusted-proxy` mode fail closed instead of accepting local password fallback credentials after trusted-proxy identity checks fail. Fixes #78684.

src/agents/pi-embedded-runner/transcript-rewrite.test.ts

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,115 @@ describe("rewriteTranscriptEntriesInSessionManager", () => {
271271
content: [{ type: "text", text: "summarized" }],
272272
});
273273
});
274+
275+
it("dedupes exact replayed user payloads when rewriting a polluted branch", () => {
276+
const sessionManager = SessionManager.inMemory();
277+
const heartbeatPrompt = "Read HEARTBEAT.md if it exists (workspace context)...";
278+
const replayedUserMessage = asAppendMessage({
279+
role: "user",
280+
content: heartbeatPrompt,
281+
timestamp: 1,
282+
});
283+
const entryIds = appendSessionMessages(sessionManager, [
284+
replayedUserMessage,
285+
replayedUserMessage,
286+
replayedUserMessage,
287+
asAppendMessage({
288+
role: "assistant",
289+
content: createTextContent("ack"),
290+
timestamp: 2,
291+
}),
292+
]);
293+
294+
const result = rewriteTranscriptEntriesInSessionManager({
295+
sessionManager,
296+
replacements: [
297+
{
298+
entryId: entryIds[0],
299+
message: replayedUserMessage as AgentMessage,
300+
},
301+
],
302+
});
303+
304+
expect(result.changed).toBe(true);
305+
const branchMessages = getBranchMessages(sessionManager);
306+
expect(branchMessages.map((message) => message.role)).toEqual(["user", "assistant"]);
307+
expect(branchMessages[0]).toMatchObject({
308+
role: "user",
309+
content: heartbeatPrompt,
310+
timestamp: 1,
311+
});
312+
});
313+
314+
it("preserves repeated user text when message timestamps differ", () => {
315+
const sessionManager = SessionManager.inMemory();
316+
const entryIds = appendSessionMessages(sessionManager, [
317+
asAppendMessage({ role: "user", content: "yes", timestamp: 1 }),
318+
asAppendMessage({ role: "user", content: "yes", timestamp: 2 }),
319+
asAppendMessage({
320+
role: "assistant",
321+
content: createTextContent("ok"),
322+
timestamp: 3,
323+
}),
324+
]);
325+
326+
const result = rewriteTranscriptEntriesInSessionManager({
327+
sessionManager,
328+
replacements: [
329+
{
330+
entryId: entryIds[0],
331+
message: { role: "user", content: "yes", timestamp: 1 } as AgentMessage,
332+
},
333+
],
334+
});
335+
336+
expect(result.changed).toBe(true);
337+
expect(getBranchMessages(sessionManager).map((message) => message.role)).toEqual([
338+
"user",
339+
"user",
340+
"assistant",
341+
]);
342+
});
343+
344+
it("dedupes replayed bootstrap custom entries and cloned compactions", () => {
345+
const sessionManager = SessionManager.inMemory();
346+
const entryIds = appendSessionMessages(sessionManager, [
347+
asAppendMessage({ role: "user", content: "start", timestamp: 1 }),
348+
asAppendMessage({
349+
role: "assistant",
350+
content: createTextContent("kept"),
351+
timestamp: 2,
352+
}),
353+
]);
354+
sessionManager.appendCustomEntry("openclaw:bootstrap-context:full", { hash: "same" });
355+
sessionManager.appendCustomEntry("openclaw:bootstrap-context:full", { hash: "same" });
356+
const firstCompactionId = sessionManager.appendCompaction("summary", entryIds[1], 9_000, {
357+
readFiles: ["HEARTBEAT.md"],
358+
});
359+
sessionManager.appendCompaction("summary", firstCompactionId, 9_000, {
360+
readFiles: ["HEARTBEAT.md"],
361+
});
362+
363+
const result = rewriteTranscriptEntriesInSessionManager({
364+
sessionManager,
365+
replacements: [
366+
{
367+
entryId: entryIds[0],
368+
message: { role: "user", content: "start", timestamp: 1 } as AgentMessage,
369+
},
370+
],
371+
});
372+
373+
expect(result.changed).toBe(true);
374+
const branch = sessionManager.getBranch();
375+
expect(
376+
branch.filter(
377+
(entry) =>
378+
entry.type === "custom" && entry.customType === "openclaw:bootstrap-context:full",
379+
),
380+
).toHaveLength(1);
381+
expect(branch.filter((entry) => entry.type === "compaction")).toHaveLength(1);
382+
});
274383
});
275384

276385
describe("rewriteTranscriptEntriesInSessionFile", () => {
@@ -344,4 +453,49 @@ describe("rewriteTranscriptEntriesInSessionFile", () => {
344453
openSpy.mockRestore();
345454
}
346455
});
456+
457+
it("persists only one copy of exact replayed user payloads during file rewrite", async () => {
458+
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-transcript-rewrite-"));
459+
const sessionManager = SessionManager.create(dir, dir);
460+
const replayedUserMessage = asAppendMessage({
461+
role: "user",
462+
content: "Read HEARTBEAT.md if it exists (workspace context)...",
463+
timestamp: 1,
464+
});
465+
const entryIds = appendSessionMessages(sessionManager, [
466+
replayedUserMessage,
467+
replayedUserMessage,
468+
replayedUserMessage,
469+
asAppendMessage({
470+
role: "assistant",
471+
content: createTextContent("ack"),
472+
timestamp: 2,
473+
}),
474+
]);
475+
const sessionFile = sessionManager.getSessionFile();
476+
expect(sessionFile).toBeTruthy();
477+
if (!sessionFile) {
478+
throw new Error("expected persisted session file");
479+
}
480+
481+
const result = await rewriteTranscriptEntriesInSessionFile({
482+
sessionFile,
483+
sessionKey: "agent:main:main:heartbeat",
484+
request: {
485+
replacements: [
486+
{
487+
entryId: entryIds[0],
488+
message: replayedUserMessage as AgentMessage,
489+
},
490+
],
491+
},
492+
});
493+
494+
expect(result.changed).toBe(true);
495+
const rewrittenSession = SessionManager.open(sessionFile);
496+
expect(getBranchMessages(rewrittenSession).map((message) => message.role)).toEqual([
497+
"user",
498+
"assistant",
499+
]);
500+
});
347501
});

src/agents/pi-embedded-runner/transcript-rewrite.ts

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { createHash } from "node:crypto";
12
import type { AgentMessage } from "@mariozechner/pi-agent-core";
23
import { SessionManager } from "@mariozechner/pi-coding-agent";
34
import type {
@@ -27,6 +28,40 @@ function estimateMessageBytes(message: AgentMessage): number {
2728
return Buffer.byteLength(JSON.stringify(message), "utf8");
2829
}
2930

31+
function hashJson(value: unknown): string {
32+
return createHash("sha256")
33+
.update(JSON.stringify(value ?? null))
34+
.digest("hex");
35+
}
36+
37+
function computeBranchEntryReplayIdentity(entry: SessionBranchEntry): string | null {
38+
if (entry.type === "message") {
39+
return `message:${hashJson(entry.message)}`;
40+
}
41+
if (entry.type === "compaction") {
42+
return `compaction:${entry.tokensBefore}:${hashJson({
43+
summary: entry.summary,
44+
details: entry.details,
45+
fromHook: entry.fromHook,
46+
})}`;
47+
}
48+
if (entry.type === "custom") {
49+
return `custom:${entry.customType}:${hashJson(entry.data)}`;
50+
}
51+
if (entry.type === "custom_message") {
52+
return `custom_message:${entry.customType}:${hashJson({
53+
content: entry.content,
54+
details: entry.details,
55+
display: entry.display,
56+
})}`;
57+
}
58+
return null;
59+
}
60+
61+
function computeMessageReplayIdentity(message: AgentMessage): string {
62+
return `message:${hashJson(message)}`;
63+
}
64+
3065
function remapEntryId(
3166
entryId: string | null | undefined,
3267
rewrittenEntryIds: ReadonlyMap<string, string>,
@@ -227,19 +262,35 @@ export function rewriteTranscriptEntriesInSessionManager(params: {
227262
// re-running persistence hooks or size truncation on replayed messages.
228263
const appendMessage = getRawSessionAppendMessage(params.sessionManager);
229264
const rewrittenEntryIds = new Map<string, string>();
265+
const emittedReplayIdentities = new Map<string, string>();
230266
for (let index = matchedIndices[0]; index < branch.length; index++) {
231267
const entry = branch[index];
232268
const replacement = entry.type === "message" ? replacementsById.get(entry.id) : undefined;
233-
const newEntryId =
234-
replacement === undefined
235-
? appendBranchEntry({
236-
sessionManager: params.sessionManager,
237-
entry,
238-
rewrittenEntryIds,
239-
appendMessage,
240-
})
241-
: appendMessage(replacement as Parameters<typeof params.sessionManager.appendMessage>[0]);
269+
if (replacement === undefined) {
270+
const replayIdentity = computeBranchEntryReplayIdentity(entry);
271+
const existingEntryId =
272+
replayIdentity === null ? undefined : emittedReplayIdentities.get(replayIdentity);
273+
if (existingEntryId !== undefined) {
274+
rewrittenEntryIds.set(entry.id, existingEntryId);
275+
continue;
276+
}
277+
const newEntryId = appendBranchEntry({
278+
sessionManager: params.sessionManager,
279+
entry,
280+
rewrittenEntryIds,
281+
appendMessage,
282+
});
283+
rewrittenEntryIds.set(entry.id, newEntryId);
284+
if (replayIdentity !== null) {
285+
emittedReplayIdentities.set(replayIdentity, newEntryId);
286+
}
287+
continue;
288+
}
289+
const newEntryId = appendMessage(
290+
replacement as Parameters<typeof params.sessionManager.appendMessage>[0],
291+
);
242292
rewrittenEntryIds.set(entry.id, newEntryId);
293+
emittedReplayIdentities.set(computeMessageReplayIdentity(replacement), newEntryId);
243294
}
244295

245296
return {
@@ -328,19 +379,34 @@ export function rewriteTranscriptEntriesInState(params: {
328379

329380
const appendedEntries: SessionBranchEntry[] = [];
330381
const rewrittenEntryIds = new Map<string, string>();
382+
const emittedReplayIdentities = new Map<string, string>();
331383
for (let index = matchedIndices[0]; index < branch.length; index++) {
332384
const entry = branch[index];
333385
const replacement = entry.type === "message" ? replacementsById.get(entry.id) : undefined;
334-
const newEntry =
335-
replacement === undefined
336-
? appendTranscriptStateBranchEntry({
337-
state: params.state,
338-
entry,
339-
rewrittenEntryIds,
340-
})
341-
: params.state.appendMessage(replacement);
386+
if (replacement === undefined) {
387+
const replayIdentity = computeBranchEntryReplayIdentity(entry);
388+
const existingEntryId =
389+
replayIdentity === null ? undefined : emittedReplayIdentities.get(replayIdentity);
390+
if (existingEntryId !== undefined) {
391+
rewrittenEntryIds.set(entry.id, existingEntryId);
392+
continue;
393+
}
394+
const newEntry = appendTranscriptStateBranchEntry({
395+
state: params.state,
396+
entry,
397+
rewrittenEntryIds,
398+
});
399+
rewrittenEntryIds.set(entry.id, newEntry.id);
400+
appendedEntries.push(newEntry);
401+
if (replayIdentity !== null) {
402+
emittedReplayIdentities.set(replayIdentity, newEntry.id);
403+
}
404+
continue;
405+
}
406+
const newEntry = params.state.appendMessage(replacement);
342407
rewrittenEntryIds.set(entry.id, newEntry.id);
343408
appendedEntries.push(newEntry);
409+
emittedReplayIdentities.set(computeMessageReplayIdentity(replacement), newEntry.id);
344410
}
345411

346412
return {

0 commit comments

Comments
 (0)