Skip to content

Commit 751d5b7

Browse files
authored
feat: add context engine transcript maintenance (#51191)
Merged via squash. Prepared head SHA: b42a3c2 Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman
1 parent 6526074 commit 751d5b7

20 files changed

Lines changed: 1305 additions & 107 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Docs: https://docs.openclaw.ai
5252
- Docs/plugins: add the community DingTalk plugin listing to the docs catalog. (#29913) Thanks @sliverp.
5353
- Docs/plugins: add the community QQbot plugin listing to the docs catalog. (#29898) Thanks @sliverp.
5454
- Plugins/context engines: pass the embedded runner `modelId` into context-engine `assemble()` so plugins can adapt context formatting per model. (#47437) thanks @jscianna.
55+
- Plugins/context engines: add transcript maintenance rewrites for context engines, preserve active-branch transcript metadata during rewrites, and harden overflow-recovery truncation to rewrite sessions under the normal session write lock. (#51191) Thanks @jalehman.
5556

5657
### Fixes
5758

src/agents/pi-embedded-runner/compact.hooks.test.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,36 @@ describe("compactEmbeddedPiSession hooks (ownsCompaction engine)", () => {
623623
}
624624
});
625625

626+
it("runs maintain after successful compaction with a transcript rewrite helper", async () => {
627+
const maintain = vi.fn(async (_params?: unknown) => ({
628+
changed: false,
629+
bytesFreed: 0,
630+
rewrittenEntries: 0,
631+
}));
632+
resolveContextEngineMock.mockResolvedValue({
633+
info: { ownsCompaction: true },
634+
compact: contextEngineCompactMock,
635+
maintain,
636+
} as never);
637+
638+
const result = await compactEmbeddedPiSession(wrappedCompactionArgs());
639+
640+
expect(result.ok).toBe(true);
641+
expect(maintain).toHaveBeenCalledWith(
642+
expect.objectContaining({
643+
sessionKey: TEST_SESSION_KEY,
644+
sessionFile: TEST_SESSION_FILE,
645+
runtimeContext: expect.objectContaining({
646+
workspaceDir: TEST_WORKSPACE_DIR,
647+
}),
648+
}),
649+
);
650+
const runtimeContext = (
651+
maintain.mock.calls[0]?.[0] as { runtimeContext?: Record<string, unknown> } | undefined
652+
)?.runtimeContext;
653+
expect(typeof runtimeContext?.rewriteTranscriptEntries).toBe("function");
654+
});
655+
626656
it("does not fire after_compaction when compaction fails", async () => {
627657
hookRunner.hasHooks.mockReturnValue(true);
628658
const sync = vi.fn(async () => {});

src/agents/pi-embedded-runner/compact.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ import {
8383
compactWithSafetyTimeout,
8484
resolveCompactionTimeoutMs,
8585
} from "./compaction-safety-timeout.js";
86+
import { runContextEngineMaintenance } from "./context-engine-maintenance.js";
8687
import { buildEmbeddedExtensionFactories } from "./extensions.js";
8788
import {
8889
logToolSchemasForGoogle,
@@ -1226,6 +1227,16 @@ export async function compactEmbeddedPiSession(
12261227
force: params.trigger === "manual",
12271228
runtimeContext: params as Record<string, unknown>,
12281229
});
1230+
if (result.ok && result.compacted) {
1231+
await runContextEngineMaintenance({
1232+
contextEngine,
1233+
sessionId: params.sessionId,
1234+
sessionKey: params.sessionKey,
1235+
sessionFile: params.sessionFile,
1236+
reason: "compaction",
1237+
runtimeContext: params as Record<string, unknown>,
1238+
});
1239+
}
12291240
if (engineOwnsCompaction && result.ok && result.compacted) {
12301241
await runPostCompactionSideEffects({
12311242
config: params.config,
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import { beforeEach, describe, expect, it, vi } from "vitest";
2+
3+
const rewriteTranscriptEntriesInSessionManagerMock = vi.fn((_params?: unknown) => ({
4+
changed: true,
5+
bytesFreed: 77,
6+
rewrittenEntries: 1,
7+
}));
8+
const rewriteTranscriptEntriesInSessionFileMock = vi.fn(async (_params?: unknown) => ({
9+
changed: true,
10+
bytesFreed: 123,
11+
rewrittenEntries: 2,
12+
}));
13+
14+
vi.mock("./transcript-rewrite.js", () => ({
15+
rewriteTranscriptEntriesInSessionManager: (params: unknown) =>
16+
rewriteTranscriptEntriesInSessionManagerMock(params),
17+
rewriteTranscriptEntriesInSessionFile: (params: unknown) =>
18+
rewriteTranscriptEntriesInSessionFileMock(params),
19+
}));
20+
21+
import {
22+
buildContextEngineMaintenanceRuntimeContext,
23+
runContextEngineMaintenance,
24+
} from "./context-engine-maintenance.js";
25+
26+
describe("buildContextEngineMaintenanceRuntimeContext", () => {
27+
beforeEach(() => {
28+
rewriteTranscriptEntriesInSessionManagerMock.mockClear();
29+
rewriteTranscriptEntriesInSessionFileMock.mockClear();
30+
});
31+
32+
it("adds a transcript rewrite helper that targets the current session file", async () => {
33+
const runtimeContext = buildContextEngineMaintenanceRuntimeContext({
34+
sessionId: "session-1",
35+
sessionKey: "agent:main:session-1",
36+
sessionFile: "/tmp/session.jsonl",
37+
runtimeContext: { workspaceDir: "/tmp/workspace" },
38+
});
39+
40+
expect(runtimeContext.workspaceDir).toBe("/tmp/workspace");
41+
expect(typeof runtimeContext.rewriteTranscriptEntries).toBe("function");
42+
43+
const result = await runtimeContext.rewriteTranscriptEntries?.({
44+
replacements: [
45+
{ entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } },
46+
],
47+
});
48+
49+
expect(result).toEqual({
50+
changed: true,
51+
bytesFreed: 123,
52+
rewrittenEntries: 2,
53+
});
54+
expect(rewriteTranscriptEntriesInSessionFileMock).toHaveBeenCalledWith({
55+
sessionFile: "/tmp/session.jsonl",
56+
sessionId: "session-1",
57+
sessionKey: "agent:main:session-1",
58+
request: {
59+
replacements: [
60+
{ entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } },
61+
],
62+
},
63+
});
64+
});
65+
66+
it("reuses the active session manager when one is provided", async () => {
67+
const sessionManager = { appendMessage: vi.fn() } as unknown as Parameters<
68+
typeof buildContextEngineMaintenanceRuntimeContext
69+
>[0]["sessionManager"];
70+
const runtimeContext = buildContextEngineMaintenanceRuntimeContext({
71+
sessionId: "session-1",
72+
sessionKey: "agent:main:session-1",
73+
sessionFile: "/tmp/session.jsonl",
74+
sessionManager,
75+
});
76+
77+
const result = await runtimeContext.rewriteTranscriptEntries?.({
78+
replacements: [
79+
{ entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } },
80+
],
81+
});
82+
83+
expect(result).toEqual({
84+
changed: true,
85+
bytesFreed: 77,
86+
rewrittenEntries: 1,
87+
});
88+
expect(rewriteTranscriptEntriesInSessionManagerMock).toHaveBeenCalledWith({
89+
sessionManager,
90+
replacements: [
91+
{ entryId: "entry-1", message: { role: "user", content: "hi", timestamp: 1 } },
92+
],
93+
});
94+
expect(rewriteTranscriptEntriesInSessionFileMock).not.toHaveBeenCalled();
95+
});
96+
});
97+
98+
describe("runContextEngineMaintenance", () => {
99+
beforeEach(() => {
100+
rewriteTranscriptEntriesInSessionManagerMock.mockClear();
101+
rewriteTranscriptEntriesInSessionFileMock.mockClear();
102+
});
103+
104+
it("passes a rewrite-capable runtime context into maintain()", async () => {
105+
const maintain = vi.fn(async (_params?: unknown) => ({
106+
changed: false,
107+
bytesFreed: 0,
108+
rewrittenEntries: 0,
109+
}));
110+
111+
const result = await runContextEngineMaintenance({
112+
contextEngine: {
113+
info: { id: "test", name: "Test Engine" },
114+
ingest: async () => ({ ingested: true }),
115+
assemble: async ({ messages }) => ({ messages, estimatedTokens: 0 }),
116+
compact: async () => ({ ok: true, compacted: false }),
117+
maintain,
118+
},
119+
sessionId: "session-1",
120+
sessionKey: "agent:main:session-1",
121+
sessionFile: "/tmp/session.jsonl",
122+
reason: "turn",
123+
runtimeContext: { workspaceDir: "/tmp/workspace" },
124+
});
125+
126+
expect(result).toEqual({
127+
changed: false,
128+
bytesFreed: 0,
129+
rewrittenEntries: 0,
130+
});
131+
expect(maintain).toHaveBeenCalledWith(
132+
expect.objectContaining({
133+
sessionId: "session-1",
134+
sessionKey: "agent:main:session-1",
135+
sessionFile: "/tmp/session.jsonl",
136+
runtimeContext: expect.objectContaining({
137+
workspaceDir: "/tmp/workspace",
138+
}),
139+
}),
140+
);
141+
const runtimeContext = (
142+
maintain.mock.calls[0]?.[0] as
143+
| { runtimeContext?: { rewriteTranscriptEntries?: (request: unknown) => Promise<unknown> } }
144+
| undefined
145+
)?.runtimeContext as
146+
| { rewriteTranscriptEntries?: (request: unknown) => Promise<unknown> }
147+
| undefined;
148+
expect(typeof runtimeContext?.rewriteTranscriptEntries).toBe("function");
149+
});
150+
});
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import type {
2+
ContextEngine,
3+
ContextEngineMaintenanceResult,
4+
ContextEngineRuntimeContext,
5+
} from "../../context-engine/types.js";
6+
import { log } from "./logger.js";
7+
import {
8+
rewriteTranscriptEntriesInSessionFile,
9+
rewriteTranscriptEntriesInSessionManager,
10+
} from "./transcript-rewrite.js";
11+
12+
/**
13+
* Attach runtime-owned transcript rewrite helpers to an existing
14+
* context-engine runtime context payload.
15+
*/
16+
export function buildContextEngineMaintenanceRuntimeContext(params: {
17+
sessionId: string;
18+
sessionKey?: string;
19+
sessionFile: string;
20+
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
21+
runtimeContext?: ContextEngineRuntimeContext;
22+
}): ContextEngineRuntimeContext {
23+
return {
24+
...params.runtimeContext,
25+
rewriteTranscriptEntries: async (request) => {
26+
if (params.sessionManager) {
27+
return rewriteTranscriptEntriesInSessionManager({
28+
sessionManager: params.sessionManager,
29+
replacements: request.replacements,
30+
});
31+
}
32+
return await rewriteTranscriptEntriesInSessionFile({
33+
sessionFile: params.sessionFile,
34+
sessionId: params.sessionId,
35+
sessionKey: params.sessionKey,
36+
request,
37+
});
38+
},
39+
};
40+
}
41+
42+
/**
43+
* Run optional context-engine transcript maintenance and normalize the result.
44+
*/
45+
export async function runContextEngineMaintenance(params: {
46+
contextEngine?: ContextEngine;
47+
sessionId: string;
48+
sessionKey?: string;
49+
sessionFile: string;
50+
reason: "bootstrap" | "compaction" | "turn";
51+
sessionManager?: Parameters<typeof rewriteTranscriptEntriesInSessionManager>[0]["sessionManager"];
52+
runtimeContext?: ContextEngineRuntimeContext;
53+
}): Promise<ContextEngineMaintenanceResult | undefined> {
54+
if (typeof params.contextEngine?.maintain !== "function") {
55+
return undefined;
56+
}
57+
58+
try {
59+
const result = await params.contextEngine.maintain({
60+
sessionId: params.sessionId,
61+
sessionKey: params.sessionKey,
62+
sessionFile: params.sessionFile,
63+
runtimeContext: buildContextEngineMaintenanceRuntimeContext({
64+
sessionId: params.sessionId,
65+
sessionKey: params.sessionKey,
66+
sessionFile: params.sessionFile,
67+
sessionManager: params.sessionManager,
68+
runtimeContext: params.runtimeContext,
69+
}),
70+
});
71+
if (result.changed) {
72+
log.info(
73+
`[context-engine] maintenance(${params.reason}) changed transcript ` +
74+
`rewrittenEntries=${result.rewrittenEntries} bytesFreed=${result.bytesFreed} ` +
75+
`sessionKey=${params.sessionKey ?? params.sessionId ?? "unknown"}`,
76+
);
77+
}
78+
return result;
79+
} catch (err) {
80+
log.warn(`context engine maintain failed (${params.reason}): ${String(err)}`);
81+
return undefined;
82+
}
83+
}

src/agents/pi-embedded-runner/run.overflow-compaction.harness.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ export const mockedEnsureRuntimePluginsLoaded = vi.fn<(params?: unknown) => void
6666
export const mockedPrepareProviderRuntimeAuth = vi.fn(async () => undefined);
6767
export const mockedRunEmbeddedAttempt =
6868
vi.fn<(params: unknown) => Promise<EmbeddedRunAttemptResult>>();
69+
export const mockedRunContextEngineMaintenance = vi.fn(async () => undefined);
6970
export const mockedSessionLikelyHasOversizedToolResults = vi.fn(() => false);
7071
export const mockedTruncateOversizedToolResultsInSession = vi.fn<
7172
() => Promise<MockTruncateOversizedToolResultsResult>
@@ -173,6 +174,8 @@ export function resetRunOverflowCompactionHarnessMocks(): void {
173174
mockedPrepareProviderRuntimeAuth.mockReset();
174175
mockedPrepareProviderRuntimeAuth.mockResolvedValue(undefined);
175176
mockedRunEmbeddedAttempt.mockReset();
177+
mockedRunContextEngineMaintenance.mockReset();
178+
mockedRunContextEngineMaintenance.mockResolvedValue(undefined);
176179
mockedSessionLikelyHasOversizedToolResults.mockReset();
177180
mockedSessionLikelyHasOversizedToolResults.mockReturnValue(false);
178181
mockedTruncateOversizedToolResultsInSession.mockReset();
@@ -303,6 +306,10 @@ export async function loadRunOverflowCompactionHarness(): Promise<{
303306
runEmbeddedAttempt: mockedRunEmbeddedAttempt,
304307
}));
305308

309+
vi.doMock("./context-engine-maintenance.js", () => ({
310+
runContextEngineMaintenance: mockedRunContextEngineMaintenance,
311+
}));
312+
306313
vi.doMock("./model.js", () => ({
307314
resolveModelAsync: vi.fn(async () => ({
308315
model: {

src/agents/pi-embedded-runner/run.overflow-compaction.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
mockedContextEngine,
1717
mockedCompactDirect,
1818
mockedRunEmbeddedAttempt,
19+
mockedRunContextEngineMaintenance,
1920
resetRunOverflowCompactionHarnessMocks,
2021
mockedSessionLikelyHasOversizedToolResults,
2122
mockedTruncateOversizedToolResultsInSession,
@@ -35,6 +36,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
3536

3637
beforeEach(() => {
3738
mockedRunEmbeddedAttempt.mockReset();
39+
mockedRunContextEngineMaintenance.mockReset();
3840
mockedCompactDirect.mockReset();
3941
mockedCoerceToFailoverError.mockReset();
4042
mockedDescribeFailoverError.mockReset();
@@ -50,6 +52,7 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
5052
compacted: false,
5153
reason: "nothing to compact",
5254
});
55+
mockedRunContextEngineMaintenance.mockResolvedValue(undefined);
5356
mockedCoerceToFailoverError.mockReturnValue(null);
5457
mockedDescribeFailoverError.mockImplementation((err: unknown) => ({
5558
message: err instanceof Error ? err.message : String(err),
@@ -241,6 +244,37 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
241244
);
242245
});
243246

247+
it("runs maintenance after successful overflow-recovery compaction", async () => {
248+
mockedContextEngine.info.ownsCompaction = true;
249+
mockedRunEmbeddedAttempt
250+
.mockResolvedValueOnce(makeAttemptResult({ promptError: makeOverflowError() }))
251+
.mockResolvedValueOnce(makeAttemptResult({ promptError: null }));
252+
mockedCompactDirect.mockResolvedValueOnce({
253+
ok: true,
254+
compacted: true,
255+
result: {
256+
summary: "engine-owned compaction",
257+
tokensAfter: 50,
258+
},
259+
});
260+
261+
await runEmbeddedPiAgent(overflowBaseRunParams);
262+
263+
expect(mockedRunContextEngineMaintenance).toHaveBeenCalledWith(
264+
expect.objectContaining({
265+
contextEngine: mockedContextEngine,
266+
sessionId: "test-session",
267+
sessionKey: "test-key",
268+
sessionFile: "/tmp/session.json",
269+
reason: "compaction",
270+
runtimeContext: expect.objectContaining({
271+
trigger: "overflow",
272+
authProfileId: "test-profile",
273+
}),
274+
}),
275+
);
276+
});
277+
244278
it("guards thrown engine-owned overflow compaction attempts", async () => {
245279
mockedContextEngine.info.ownsCompaction = true;
246280
mockedGlobalHookRunner.hasHooks.mockImplementation(

0 commit comments

Comments
 (0)