Skip to content

Commit 280d1cb

Browse files
committed
fix(auto-reply): deliver queued compaction notices
1 parent 14b1ebd commit 280d1cb

8 files changed

Lines changed: 719 additions & 144 deletions

src/auto-reply/reply/agent-runner-execution.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4775,6 +4775,69 @@ describe("runAgentTurnWithFallback", () => {
47754775
});
47764776
});
47774777

4778+
it("uses the compaction notice fallback when no block-reply dispatcher is wired", async () => {
4779+
const onCompactionNoticePayload = vi.fn();
4780+
state.runEmbeddedAgentMock.mockImplementationOnce(async (params: EmbeddedAgentParams) => {
4781+
await params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } });
4782+
await params.onAgentEvent?.({
4783+
stream: "compaction",
4784+
data: { phase: "end", completed: true },
4785+
});
4786+
return { payloads: [{ text: "final" }], meta: {} };
4787+
});
4788+
4789+
const followupRun = createFollowupRun();
4790+
followupRun.run.config = {
4791+
agents: {
4792+
defaults: {
4793+
compaction: {
4794+
notifyUser: true,
4795+
},
4796+
},
4797+
},
4798+
};
4799+
4800+
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
4801+
const result = await runAgentTurnWithFallback({
4802+
commandBody: "hello",
4803+
followupRun,
4804+
sessionCtx: {
4805+
Provider: "whatsapp",
4806+
MessageSid: "msg",
4807+
} as unknown as TemplateContext,
4808+
opts: {},
4809+
typingSignals: createMockTypingSignaler(),
4810+
blockReplyPipeline: null,
4811+
blockStreamingEnabled: false,
4812+
resolvedBlockStreamingBreak: "message_end",
4813+
applyReplyToMode: (payload) => payload,
4814+
shouldEmitToolResult: () => true,
4815+
shouldEmitToolOutput: () => false,
4816+
pendingToolTasks: new Set(),
4817+
resetSessionAfterRoleOrderingConflict: async () => false,
4818+
isHeartbeat: false,
4819+
sessionKey: "main",
4820+
getActiveSessionEntry: () => undefined,
4821+
resolvedVerboseLevel: "off",
4822+
onCompactionNoticePayload,
4823+
});
4824+
4825+
expect(result.kind).toBe("success");
4826+
expect(onCompactionNoticePayload).toHaveBeenCalledTimes(2);
4827+
expectBlockReplyCall(onCompactionNoticePayload, 0, {
4828+
text: "🧹 Compacting context...",
4829+
replyToId: "msg",
4830+
replyToCurrent: true,
4831+
isCompactionNotice: true,
4832+
});
4833+
expectBlockReplyCall(onCompactionNoticePayload, 1, {
4834+
text: "🧹 Compaction complete",
4835+
replyToId: "msg",
4836+
replyToCurrent: true,
4837+
isCompactionNotice: true,
4838+
});
4839+
});
4840+
47784841
it("surfaces billing guidance for mixed-cause fallback exhaustion", async () => {
47794842
state.runWithModelFallbackMock.mockRejectedValueOnce(
47804843
Object.assign(

src/auto-reply/reply/agent-runner-execution.ts

Lines changed: 34 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ import {
106106
resolveModelFallbackOptions,
107107
} from "./agent-runner-utils.js";
108108
import type { BlockReplyPipeline } from "./block-reply-pipeline.js";
109+
import {
110+
createCompactionHookNoticePayload,
111+
createCompactionNoticePayload,
112+
readCompactionHookMessages,
113+
shouldNotifyUserAboutCompaction,
114+
} from "./compaction-notice.js";
109115
import { resolveCurrentTurnImages } from "./current-turn-images.js";
110116
import { hasInboundAudio } from "./inbound-media.js";
111117
import { resolveOriginMessageProvider } from "./origin-routing.js";
@@ -1493,6 +1499,7 @@ export async function runAgentTurnWithFallback(params: {
14931499
resolvedVerboseLevel: VerboseLevel;
14941500
toolProgressDetail?: "explain" | "raw";
14951501
replyMediaContext?: ReplyMediaContext;
1502+
onCompactionNoticePayload?: (payload: ReplyPayload) => Promise<void> | void;
14961503
}): Promise<AgentRunLoopResult> {
14971504
const TRANSIENT_HTTP_RETRY_DELAY_MS = 2_500;
14981505
let didLogHeartbeatStrip = false;
@@ -1605,56 +1612,40 @@ export async function runAgentTurnWithFallback(params: {
16051612
params.opts?.onAgentRunStart?.(runId);
16061613
};
16071614
const currentMessageId = params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid;
1608-
const shouldNotifyUserAboutCompaction =
1609-
runtimeConfig?.agents?.defaults?.compaction?.notifyUser === true;
1610-
const sendCompactionNotice = async (phase: "start" | "end" | "incomplete") => {
1611-
if (!params.opts?.onBlockReply) {
1612-
return;
1613-
}
1614-
const text =
1615-
phase === "start"
1616-
? "🧹 Compacting context..."
1617-
: phase === "end"
1618-
? "🧹 Compaction complete"
1619-
: "🧹 Compaction incomplete";
1620-
const noticePayload = params.applyReplyToMode({
1621-
text,
1622-
replyToId: currentMessageId,
1623-
replyToCurrent: true,
1624-
isCompactionNotice: true,
1625-
});
1615+
const notifyUserAboutCompaction = shouldNotifyUserAboutCompaction(runtimeConfig);
1616+
const deliverCompactionNoticePayload = async (noticePayload: ReplyPayload, label: string) => {
16261617
try {
1627-
await params.opts.onBlockReply(noticePayload);
1618+
if (params.opts?.onBlockReply) {
1619+
await params.opts.onBlockReply(noticePayload);
1620+
return;
1621+
}
1622+
await params.onCompactionNoticePayload?.(noticePayload);
16281623
} catch (err) {
16291624
// Non-critical notice delivery failure should not bubble out of the
16301625
// fire-and-forget event handler.
1631-
logVerbose(`compaction ${phase} notice delivery failed (non-fatal): ${String(err)}`);
1626+
logVerbose(`compaction ${label} notice delivery failed (non-fatal): ${String(err)}`);
16321627
}
16331628
};
1634-
const readCompactionHookMessages = (value: unknown): string[] => {
1635-
if (!Array.isArray(value)) {
1636-
return [];
1637-
}
1638-
return value
1639-
.filter((entry): entry is string => typeof entry === "string")
1640-
.map((entry) => entry.trim())
1641-
.filter((entry) => entry.length > 0);
1629+
const sendCompactionNotice = async (phase: "start" | "end" | "incomplete") => {
1630+
await deliverCompactionNoticePayload(
1631+
createCompactionNoticePayload({
1632+
phase,
1633+
currentMessageId,
1634+
applyReplyToMode: params.applyReplyToMode,
1635+
}),
1636+
phase,
1637+
);
16421638
};
16431639
const sendCompactionHookMessages = async (messages: string[]) => {
1644-
if (!params.opts?.onBlockReply || messages.length === 0) {
1645-
return;
1646-
}
1647-
const noticePayload = params.applyReplyToMode({
1648-
text: messages.join("\n\n"),
1649-
replyToId: currentMessageId,
1650-
replyToCurrent: true,
1651-
isCompactionNotice: true,
1640+
const noticePayload = createCompactionHookNoticePayload({
1641+
messages,
1642+
currentMessageId,
1643+
applyReplyToMode: params.applyReplyToMode,
16521644
});
1653-
try {
1654-
await params.opts.onBlockReply(noticePayload);
1655-
} catch (err) {
1656-
logVerbose(`compaction hook notice delivery failed (non-fatal): ${String(err)}`);
1645+
if (!noticePayload) {
1646+
return;
16571647
}
1648+
await deliverCompactionNoticePayload(noticePayload, "hook");
16581649
};
16591650
const shouldSurfaceToControlUi = isInternalMessageChannel(
16601651
params.followupRun.run.messageProvider ??
@@ -2575,7 +2566,7 @@ export async function runAgentTurnWithFallback(params: {
25752566
}
25762567
if (hookMessages.length > 0) {
25772568
await sendCompactionHookMessages(hookMessages);
2578-
} else if (shouldNotifyUserAboutCompaction) {
2569+
} else if (notifyUserAboutCompaction) {
25792570
// Send directly via opts.onBlockReply (bypassing the
25802571
// pipeline) so the notice does not cause final payloads
25812572
// to be discarded on non-streaming model paths.
@@ -2591,12 +2582,12 @@ export async function runAgentTurnWithFallback(params: {
25912582
}
25922583
if (hookMessages.length > 0) {
25932584
await sendCompactionHookMessages(hookMessages);
2594-
} else if (shouldNotifyUserAboutCompaction) {
2585+
} else if (notifyUserAboutCompaction) {
25952586
await sendCompactionNotice("end");
25962587
}
25972588
} else if (hookMessages.length > 0) {
25982589
await sendCompactionHookMessages(hookMessages);
2599-
} else if (shouldNotifyUserAboutCompaction) {
2590+
} else if (notifyUserAboutCompaction) {
26002591
await sendCompactionNotice("incomplete");
26012592
}
26022593
}

src/auto-reply/reply/agent-runner-memory.test.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,7 @@ describe("runMemoryFlushIfNeeded", () => {
983983
totalTokens: 120,
984984
totalTokensFresh: true,
985985
};
986+
const onCompactionNotice = vi.fn();
986987

987988
const entry = await runPreflightCompactionIfNeeded({
988989
cfg: { agents: { defaults: { compaction: { memoryFlush: {} } } } },
@@ -999,6 +1000,7 @@ describe("runMemoryFlushIfNeeded", () => {
9991000
storePath: path.join(rootDir, "sessions.json"),
10001001
isHeartbeat: false,
10011002
replyOperation: createReplyOperation(),
1003+
onCompactionNotice,
10021004
});
10031005

10041006
expect(entry).toBe(sessionEntry);
@@ -1013,6 +1015,8 @@ describe("runMemoryFlushIfNeeded", () => {
10131015
contextTokenBudget: 100,
10141016
});
10151017
expect(incrementCompactionCountMock).not.toHaveBeenCalled();
1018+
expect(onCompactionNotice).toHaveBeenNthCalledWith(1, "start");
1019+
expect(onCompactionNotice).toHaveBeenNthCalledWith(2, "skipped");
10161020
});
10171021

10181022
it("fails when required preflight context-engine compaction is deferred to background maintenance", async () => {
@@ -2015,6 +2019,107 @@ describe("runMemoryFlushIfNeeded", () => {
20152019
expect(compactCall.sessionFile).toContain("large-session.jsonl");
20162020
});
20172021

2022+
it("emits preflight compaction notices around a successful budget compaction", async () => {
2023+
const sessionFile = path.join(rootDir, "notify-session.jsonl");
2024+
await fs.writeFile(
2025+
sessionFile,
2026+
`${JSON.stringify({ message: { role: "user", content: "x".repeat(5_000) } })}\n`,
2027+
"utf8",
2028+
);
2029+
const sessionEntry: SessionEntry = {
2030+
sessionId: "session",
2031+
sessionFile,
2032+
updatedAt: Date.now(),
2033+
totalTokens: 120,
2034+
totalTokensFresh: true,
2035+
compactionCount: 0,
2036+
};
2037+
const onCompactionNotice = vi.fn();
2038+
2039+
await runPreflightCompactionIfNeeded({
2040+
cfg: {
2041+
agents: {
2042+
defaults: {
2043+
compaction: {
2044+
notifyUser: true,
2045+
truncateAfterCompaction: true,
2046+
maxActiveTranscriptBytes: "10b",
2047+
},
2048+
},
2049+
},
2050+
},
2051+
followupRun: createTestFollowupRun({
2052+
sessionId: "session",
2053+
sessionFile,
2054+
sessionKey: "main",
2055+
}),
2056+
defaultModel: "anthropic/claude-opus-4-6",
2057+
agentCfgContextTokens: 100_000,
2058+
sessionEntry,
2059+
sessionStore: { main: sessionEntry },
2060+
sessionKey: "main",
2061+
storePath: path.join(rootDir, "sessions.json"),
2062+
isHeartbeat: false,
2063+
replyOperation: createReplyOperation(),
2064+
onCompactionNotice,
2065+
});
2066+
2067+
expect(onCompactionNotice).toHaveBeenNthCalledWith(1, "start");
2068+
expect(onCompactionNotice).toHaveBeenNthCalledWith(2, "end");
2069+
});
2070+
2071+
it("emits an incomplete preflight compaction notice when post-compaction state update throws", async () => {
2072+
const sessionFile = path.join(rootDir, "notify-failed-session.jsonl");
2073+
await fs.writeFile(
2074+
sessionFile,
2075+
`${JSON.stringify({ message: { role: "user", content: "x".repeat(5_000) } })}\n`,
2076+
"utf8",
2077+
);
2078+
incrementCompactionCountMock.mockRejectedValueOnce(new Error("count update failed"));
2079+
const sessionEntry: SessionEntry = {
2080+
sessionId: "session",
2081+
sessionFile,
2082+
updatedAt: Date.now(),
2083+
totalTokens: 120,
2084+
totalTokensFresh: true,
2085+
compactionCount: 0,
2086+
};
2087+
const onCompactionNotice = vi.fn();
2088+
2089+
await expect(
2090+
runPreflightCompactionIfNeeded({
2091+
cfg: {
2092+
agents: {
2093+
defaults: {
2094+
compaction: {
2095+
notifyUser: true,
2096+
truncateAfterCompaction: true,
2097+
maxActiveTranscriptBytes: "10b",
2098+
},
2099+
},
2100+
},
2101+
},
2102+
followupRun: createTestFollowupRun({
2103+
sessionId: "session",
2104+
sessionFile,
2105+
sessionKey: "main",
2106+
}),
2107+
defaultModel: "anthropic/claude-opus-4-6",
2108+
agentCfgContextTokens: 100_000,
2109+
sessionEntry,
2110+
sessionStore: { main: sessionEntry },
2111+
sessionKey: "main",
2112+
storePath: path.join(rootDir, "sessions.json"),
2113+
isHeartbeat: false,
2114+
replyOperation: createReplyOperation(),
2115+
onCompactionNotice,
2116+
}),
2117+
).rejects.toThrow("count update failed");
2118+
2119+
expect(onCompactionNotice).toHaveBeenNthCalledWith(1, "start");
2120+
expect(onCompactionNotice).toHaveBeenNthCalledWith(2, "incomplete");
2121+
});
2122+
20182123
it("keeps the active transcript byte threshold inactive unless transcript rotation is enabled", async () => {
20192124
const sessionFile = path.join(rootDir, "large-session-no-rotation.jsonl");
20202125
await fs.writeFile(

0 commit comments

Comments
 (0)