Skip to content

Commit d153113

Browse files
committed
fix: move compaction planning off the event loop
1 parent e5845dd commit d153113

11 files changed

Lines changed: 1108 additions & 319 deletions

scripts/release-check.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ const requiredPathGroups = [
9191
"scripts/postinstall-bundled-plugins.mjs",
9292
"dist/plugin-sdk/compat.js",
9393
"dist/plugin-sdk/root-alias.cjs",
94+
"dist/agents/compaction-planning.worker.js",
9495
"dist/agents/model-provider-auth.worker.js",
9596
"dist/task-registry-control.runtime.js",
9697
"dist/telegram-ingress-worker.runtime.js",

src/agents/agent-hooks/compaction-safeguard.ts

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ import {
99
getCompactionProvider,
1010
type CompactionProvider,
1111
} from "../../plugins/compaction-provider.js";
12+
import {
13+
buildHistoryPrunePlanWithWorker,
14+
computeAdaptiveChunkRatioWithWorker,
15+
} from "../compaction-planning-worker.js";
1216
import {
1317
hasMeaningfulConversationContent,
1418
isRealConversationMessage,
@@ -19,9 +23,7 @@ import {
1923
SAFETY_MARGIN,
2024
SUMMARIZATION_OVERHEAD_TOKENS,
2125
computeAdaptiveChunkRatio,
22-
estimateMessagesTokens,
2326
isOversizedForSummary,
24-
pruneHistoryForContextShare,
2527
resolveContextWindowTokens,
2628
summarizeInStages,
2729
} from "../compaction.js";
@@ -1071,19 +1073,18 @@ export default function compactionSafeguardExtension(api: ExtensionAPI): void {
10711073
let droppedSummary: string | undefined;
10721074

10731075
if (tokensBefore !== undefined) {
1074-
const summarizableTokens =
1075-
estimateMessagesTokens(messagesToSummarize) + estimateMessagesTokens(turnPrefixMessages);
1076-
const newContentTokens = Math.max(0, Math.floor(tokensBefore - summarizableTokens));
1077-
// Apply SAFETY_MARGIN so token underestimates don't trigger unnecessary pruning
1078-
const maxHistoryTokens = Math.floor(contextWindowTokens * maxHistoryShare * SAFETY_MARGIN);
1079-
1080-
if (newContentTokens > maxHistoryTokens) {
1081-
const pruned = pruneHistoryForContextShare({
1082-
messages: messagesToSummarize,
1083-
maxContextTokens: contextWindowTokens,
1084-
maxHistoryShare,
1085-
parts: 2,
1086-
});
1076+
const prunePlan = await buildHistoryPrunePlanWithWorker({
1077+
messagesToSummarize,
1078+
turnPrefixMessages,
1079+
tokensBefore,
1080+
contextWindowTokens,
1081+
maxHistoryShare,
1082+
parts: 2,
1083+
signal,
1084+
});
1085+
const { newContentTokens, maxHistoryTokens, pruned } = prunePlan;
1086+
1087+
if (newContentTokens > maxHistoryTokens && pruned) {
10871088
if (pruned.droppedChunks > 0) {
10881089
const newContentRatio = (newContentTokens / contextWindowTokens) * 100;
10891090
log.warn(
@@ -1097,10 +1098,11 @@ export default function compactionSafeguardExtension(api: ExtensionAPI): void {
10971098
// Summarize dropped messages so context isn't lost
10981099
if (pruned.droppedMessagesList.length > 0) {
10991100
try {
1100-
const droppedChunkRatio = computeAdaptiveChunkRatio(
1101-
pruned.droppedMessagesList,
1102-
contextWindowTokens,
1103-
);
1101+
const droppedChunkRatio = await computeAdaptiveChunkRatioWithWorker({
1102+
messages: pruned.droppedMessagesList,
1103+
contextWindow: contextWindowTokens,
1104+
signal,
1105+
});
11041106
const droppedMaxChunkTokens = Math.max(
11051107
1,
11061108
Math.floor(contextWindowTokens * droppedChunkRatio) -
@@ -1155,7 +1157,11 @@ export default function compactionSafeguardExtension(api: ExtensionAPI): void {
11551157
// the summarization prompt, system prompt, previous summary, and reasoning budget
11561158
// that generateSummary adds on top of the serialized conversation chunk.
11571159
const allMessages = [...messagesToSummarize, ...turnPrefixMessages];
1158-
const adaptiveRatio = computeAdaptiveChunkRatio(allMessages, contextWindowTokens);
1160+
const adaptiveRatio = await computeAdaptiveChunkRatioWithWorker({
1161+
messages: allMessages,
1162+
contextWindow: contextWindowTokens,
1163+
signal,
1164+
});
11591165
const maxChunkTokens = Math.max(
11601166
1,
11611167
Math.floor(contextWindowTokens * adaptiveRatio) - SUMMARIZATION_OVERHEAD_TOKENS,
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { describe, expect, it } from "vitest";
2+
import { compactionPlanningWorkerTesting } from "./compaction-planning-worker.js";
3+
import { runCompactionPlanningWorkerInput } from "./compaction-planning.worker.js";
4+
import type { AgentMessage } from "./runtime/index.js";
5+
6+
function makeMessage(id: number, text = "x".repeat(4000)): AgentMessage {
7+
return {
8+
role: "user",
9+
content: text,
10+
timestamp: id,
11+
};
12+
}
13+
14+
describe("compaction planning worker", () => {
15+
it("resolves the packaged worker URL from stable and hashed dist modules", () => {
16+
expect(
17+
compactionPlanningWorkerTesting.resolveCompactionPlanningWorkerUrl(
18+
"file:///repo/dist/agents/compaction-planning-worker.js",
19+
).pathname,
20+
).toBe("/repo/dist/agents/compaction-planning.worker.js");
21+
expect(
22+
compactionPlanningWorkerTesting.resolveCompactionPlanningWorkerUrl(
23+
"file:///repo/dist/selection-abc123.js",
24+
).pathname,
25+
).toBe("/repo/dist/agents/compaction-planning.worker.js");
26+
});
27+
28+
it("rejects invalid worker input", () => {
29+
expect(runCompactionPlanningWorkerInput({ kind: "summaryChunks" })).toEqual({
30+
status: "failed",
31+
error: "invalid compaction planning worker input",
32+
});
33+
});
34+
35+
it("plans summary chunks in the worker", async () => {
36+
const value = await compactionPlanningWorkerTesting.runCompactionPlanningWorker({
37+
input: {
38+
kind: "summaryChunks",
39+
messages: [makeMessage(1), makeMessage(2), makeMessage(3)],
40+
maxChunkTokens: 1200,
41+
},
42+
timeoutMs: 10_000,
43+
});
44+
45+
expect(value.kind).toBe("summaryChunks");
46+
if (value.kind !== "summaryChunks") {
47+
return;
48+
}
49+
expect(value.chunks.flat().map((message) => message.timestamp)).toEqual([1, 2, 3]);
50+
expect(value.chunks.length).toBeGreaterThan(1);
51+
});
52+
53+
it("classifies missing worker runtime as unavailable", async () => {
54+
await expect(
55+
compactionPlanningWorkerTesting.runCompactionPlanningWorker({
56+
input: {
57+
kind: "summaryChunks",
58+
messages: [makeMessage(1)],
59+
maxChunkTokens: 1200,
60+
},
61+
timeoutMs: 500,
62+
workerUrl: new URL("./missing-compaction-planning.worker.js", import.meta.url),
63+
}),
64+
).rejects.toMatchObject({
65+
code: "unavailable",
66+
});
67+
});
68+
69+
it("keeps timers responsive while planning large histories", async () => {
70+
const timer = new Promise<"timer">((resolve) => {
71+
setTimeout(() => resolve("timer"), 0);
72+
});
73+
const planning = compactionPlanningWorkerTesting
74+
.runCompactionPlanningWorker({
75+
input: {
76+
kind: "stageSplit",
77+
messages: Array.from({ length: 180 }, (_, index) =>
78+
makeMessage(index + 1, "x".repeat(12_000)),
79+
),
80+
maxChunkTokens: 8000,
81+
parts: 4,
82+
},
83+
timeoutMs: 30_000,
84+
})
85+
.then(() => "planning" as const);
86+
87+
await expect(Promise.race([timer, planning])).resolves.toBe("timer");
88+
await expect(planning).resolves.toBe("planning");
89+
}, 30_000);
90+
});

0 commit comments

Comments
 (0)