Skip to content

Commit dd6b47b

Browse files
committed
Preserve session context and durable Telegram progress
1 parent a185ca2 commit dd6b47b

7 files changed

Lines changed: 230 additions & 116 deletions

File tree

extensions/telegram/src/bot-message-dispatch.test.ts

Lines changed: 33 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1539,10 +1539,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
15391539

15401540
await dispatchWithContext({ context: createContext() });
15411541

1542-
expect(answerDraftStream.update).toHaveBeenNthCalledWith(
1543-
1,
1544-
expect.stringMatching(/`🛠 Exec`$/),
1545-
);
1542+
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Cracking\n\n`🛠️ Exec`");
15461543
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Branch is up to date");
15471544
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
15481545
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
@@ -1574,8 +1571,8 @@ describe("dispatchTelegramMessage draft streaming", () => {
15741571
expect(rotationOrder).toBeLessThan(finalUpdateOrder);
15751572
});
15761573

1577-
it("keeps progress updates in a draft and sends the final answer normally", async () => {
1578-
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
1574+
it("sends progress updates as durable messages and sends the final answer normally", async () => {
1575+
setupDraftStreams({ answerMessageId: 2001 });
15791576
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
15801577
async ({ dispatcherOptions, replyOptions }) => {
15811578
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
@@ -1595,18 +1592,15 @@ describe("dispatchTelegramMessage draft streaming", () => {
15951592
telegramCfg: { streaming: { mode: "progress" } },
15961593
});
15971594

1598-
expect(answerDraftStream.update).toHaveBeenCalledWith(
1599-
"Cracking\n\n`🛠️ Exec`\n`🛠️ git rev-parse --abbrev-ref HEAD`",
1600-
);
1601-
expect(answerDraftStream.update).not.toHaveBeenCalledWith("Branch is up to date");
1602-
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
1603-
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
1604-
expectDeliveredReply(0, { text: "Branch is up to date" });
1595+
expect(createTelegramDraftStream).not.toHaveBeenCalled();
1596+
expectDeliveredReply(0, { text: expect.stringContaining("Exec") }, 0);
1597+
expectDeliveredReply(0, { text: expect.stringContaining("git rev-parse") }, 1);
1598+
expectDeliveredReply(0, { text: "Branch is up to date" }, 2);
16051599
expect(editMessageTelegram).not.toHaveBeenCalled();
16061600
});
16071601

1608-
it("does not restart progress drafts after final answer delivery", async () => {
1609-
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
1602+
it("does not send more standalone progress after final answer delivery", async () => {
1603+
setupDraftStreams({ answerMessageId: 2001 });
16101604
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
16111605
async ({ dispatcherOptions, replyOptions }) => {
16121606
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
@@ -1622,9 +1616,10 @@ describe("dispatchTelegramMessage draft streaming", () => {
16221616
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
16231617
});
16241618

1625-
expect(answerDraftStream.update).toHaveBeenCalledTimes(1);
1626-
expect(answerDraftStream.update).toHaveBeenCalledWith("Shelling\n\n`🛠️ Exec`");
1627-
expectDeliveredReply(0, { text: "Branch is up to date" });
1619+
expect(createTelegramDraftStream).not.toHaveBeenCalled();
1620+
expectDeliveredReply(0, { text: expect.stringContaining("Exec") }, 0);
1621+
expectDeliveredReply(0, { text: "Branch is up to date" }, 1);
1622+
expect(deliverReplies).toHaveBeenCalledTimes(2);
16281623
});
16291624

16301625
it("uses the transcript final when progress-mode final text is truncated", async () => {
@@ -1656,7 +1651,8 @@ describe("dispatchTelegramMessage draft streaming", () => {
16561651
telegramCfg: { streaming: { mode: "progress" } },
16571652
});
16581653

1659-
expectDeliveredReply(0, { text: fullAnswer });
1654+
expectDeliveredReply(0, { text: expect.stringContaining("Exec") }, 0);
1655+
expectDeliveredReply(0, { text: fullAnswer }, 1);
16601656
});
16611657

16621658
it("streams the first long final chunk and sends follow-up chunks", async () => {
@@ -1729,9 +1725,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
17291725
expectDeliveredReply(0, { text: undefined, mediaUrl: "https://example.com/a.png" });
17301726
});
17311727

1732-
it("shows Telegram progress drafts immediately for explicit tool starts", async () => {
1733-
const draftStream = createSequencedDraftStream(2001);
1734-
createTelegramDraftStream.mockReturnValue(draftStream);
1728+
it("sends Telegram progress immediately for explicit tool starts", async () => {
17351729
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
17361730
await replyOptions?.onReplyStart?.();
17371731
await replyOptions?.onAssistantMessageStart?.();
@@ -1745,13 +1739,11 @@ describe("dispatchTelegramMessage draft streaming", () => {
17451739
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
17461740
});
17471741

1748-
expect(draftStream.update).toHaveBeenCalledWith("Shelling\n\n`🛠️ Exec`");
1749-
expect(draftStream.flush).toHaveBeenCalled();
1742+
expect(createTelegramDraftStream).not.toHaveBeenCalled();
1743+
expectDeliveredReply(0, { text: expect.stringContaining("Exec") });
17501744
});
17511745

1752-
it("keeps the progress draft label when tool progress lines are hidden", async () => {
1753-
const draftStream = createSequencedDraftStream(2001);
1754-
createTelegramDraftStream.mockReturnValue(draftStream);
1746+
it("does not send standalone tool progress when progress lines are hidden", async () => {
17551747
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
17561748
await replyOptions?.onReplyStart?.();
17571749
await replyOptions?.onAssistantMessageStart?.();
@@ -1770,46 +1762,11 @@ describe("dispatchTelegramMessage draft streaming", () => {
17701762
},
17711763
});
17721764

1773-
expect(draftStream.update).toHaveBeenCalledWith("Shelling");
1774-
expect(draftStream.flush).toHaveBeenCalled();
1775-
});
1776-
1777-
it("keeps progress draft labels static while the draft is active", async () => {
1778-
const draftStream = createSequencedDraftStream(2001);
1779-
createTelegramDraftStream.mockReturnValue(draftStream);
1780-
let finishRun: (() => void) | undefined;
1781-
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
1782-
await replyOptions?.onReplyStart?.();
1783-
await replyOptions?.onAssistantMessageStart?.();
1784-
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
1785-
await new Promise<void>((resolve) => {
1786-
finishRun = resolve;
1787-
});
1788-
return { queuedFinal: false };
1789-
});
1790-
1791-
const run = dispatchWithContext({
1792-
context: createContext(),
1793-
streamMode: "progress",
1794-
telegramCfg: {
1795-
streaming: {
1796-
mode: "progress",
1797-
progress: { label: "Working", toolProgress: false },
1798-
},
1799-
},
1800-
});
1801-
1802-
await vi.waitFor(() => expect(draftStream.update).toHaveBeenCalledWith("Working"));
1803-
expect(draftStream.update).not.toHaveBeenCalledWith("Working.");
1804-
expect(draftStream.update).not.toHaveBeenCalledWith("Working..");
1805-
expect(draftStream.update).not.toHaveBeenCalledWith("Working...");
1806-
finishRun?.();
1807-
await run;
1765+
expect(createTelegramDraftStream).not.toHaveBeenCalled();
1766+
expect(deliverReplies).not.toHaveBeenCalled();
18081767
});
18091768

1810-
it("renders Telegram progress drafts before slow status reactions resolve", async () => {
1811-
const draftStream = createSequencedDraftStream(2001);
1812-
createTelegramDraftStream.mockReturnValue(draftStream);
1769+
it("sends Telegram progress before slow status reactions resolve", async () => {
18131770
let releaseSetTool: (() => void) | undefined;
18141771
const statusReactionController = createStatusReactionController();
18151772
statusReactionController.setTool.mockImplementation(
@@ -1822,10 +1779,14 @@ describe("dispatchTelegramMessage draft streaming", () => {
18221779
const pendingToolStart = replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
18231780
await Promise.resolve();
18241781
await Promise.resolve();
1825-
const updateBeforeStatusReaction = draftStream.update.mock.calls.at(-1)?.[0];
1782+
const progressBeforeStatusReaction = deliverReplies.mock.calls.at(-1)?.[0];
18261783
releaseSetTool?.();
18271784
await pendingToolStart;
1828-
expect(updateBeforeStatusReaction).toMatch(/^Shelling\n`🛠 Exec`$/);
1785+
expect(progressBeforeStatusReaction).toEqual(
1786+
expect.objectContaining({
1787+
replies: [expect.objectContaining({ text: expect.stringContaining("Exec") })],
1788+
}),
1789+
);
18291790
return { queuedFinal: false };
18301791
});
18311792

@@ -1840,9 +1801,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
18401801
expect(statusReactionController.setTool).toHaveBeenCalledWith("exec");
18411802
});
18421803

1843-
it("keeps non-command Telegram progress draft lines across post-tool assistant boundaries", async () => {
1844-
const draftStream = createSequencedDraftStream(2001);
1845-
createTelegramDraftStream.mockReturnValue(draftStream);
1804+
it("keeps non-command Telegram progress as durable messages across assistant boundaries", async () => {
18461805
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
18471806
async ({ dispatcherOptions, replyOptions }) => {
18481807
await replyOptions?.onReplyStart?.();
@@ -1861,13 +1820,10 @@ describe("dispatchTelegramMessage draft streaming", () => {
18611820
telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } },
18621821
});
18631822

1864-
expect(draftStream.update).toHaveBeenCalledWith(
1865-
"Shelling\n\n`🔎 Web Search: docs lookup`\n• `tests passed`",
1866-
);
1867-
expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
1868-
expect(draftStream.materialize).not.toHaveBeenCalled();
1869-
expect(draftStream.clear).toHaveBeenCalledTimes(1);
1870-
expectDeliveredReply(0, { text: "Final after tool" });
1823+
expect(createTelegramDraftStream).not.toHaveBeenCalled();
1824+
expectDeliveredReply(0, { text: expect.stringContaining("docs lookup") }, 0);
1825+
expectDeliveredReply(0, { text: expect.stringContaining("tests passed") }, 1);
1826+
expectDeliveredReply(0, { text: "Final after tool" }, 2);
18711827
expect(editMessageTelegram).not.toHaveBeenCalled();
18721828
});
18731829

extensions/telegram/src/bot-message-dispatch.ts

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -550,12 +550,14 @@ export const dispatchTelegramMessage = async ({
550550
}
551551
}
552552
const hasTelegramQuoteReply = replyToMode !== "off" && replyQuoteText != null;
553+
const canUseDraftLanes = streamMode !== "progress";
553554
const canStreamAnswerDraft =
555+
canUseDraftLanes &&
554556
streamDeliveryEnabled &&
555557
!hasTelegramQuoteReply &&
556558
!accountBlockStreamingEnabled &&
557559
!forceBlockStreamingForReasoning;
558-
const canStreamReasoningDraft = !isRoomEvent && streamReasoningDraft;
560+
const canStreamReasoningDraft = canUseDraftLanes && !isRoomEvent && streamReasoningDraft;
559561
const draftReplyToMessageId =
560562
replyToMode !== "off" && typeof msg.message_id === "number"
561563
? (replyQuoteMessageId ?? msg.message_id)
@@ -600,8 +602,8 @@ export const dispatchTelegramMessage = async ({
600602
};
601603
const answerLane = lanes.answer;
602604
const reasoningLane = lanes.reasoning;
603-
const streamToolProgressEnabled =
604-
Boolean(answerLane.stream) && resolveChannelStreamingPreviewToolProgress(telegramCfg);
605+
const streamToolProgressVisible = resolveChannelStreamingPreviewToolProgress(telegramCfg);
606+
const streamToolProgressEnabled = Boolean(answerLane.stream) && streamToolProgressVisible;
605607
const nativeToolProgressDraft =
606608
streamToolProgressEnabled &&
607609
!isRoomEvent &&
@@ -662,19 +664,34 @@ export const dispatchTelegramMessage = async ({
662664
await renderProgressDraft({ flush: true });
663665
},
664666
});
667+
let sendStandaloneToolProgress: ((text: string) => Promise<boolean>) | undefined;
665668
const pushStreamToolProgress = async (
666669
line?: string | ChannelProgressDraftLine,
667670
options?: { toolName?: string; startImmediately?: boolean },
668-
) => {
669-
if (!answerLane.stream) {
670-
return false;
671-
}
671+
): Promise<boolean> => {
672672
if (options?.toolName !== undefined && !isChannelProgressDraftWorkToolName(options.toolName)) {
673673
return false;
674674
}
675675
const rawText = typeof line === "string" ? line : line?.text;
676676
const normalized = sanitizeProgressMarkdownText(rawText?.replace(/\s+/g, " ").trim() ?? "");
677-
if (streamToolProgressSuppressed) {
677+
if (!normalized || streamToolProgressSuppressed) {
678+
return false;
679+
}
680+
if (streamMode === "progress" && !answerLane.stream) {
681+
if (!streamToolProgressVisible || isRoomEvent) {
682+
return false;
683+
}
684+
const delivered = (await sendStandaloneToolProgress?.(normalized)) ?? false;
685+
if (delivered) {
686+
void Promise.resolve(sendTyping()).catch((err) => {
687+
logVerbose(
688+
`telegram typing cue after tool progress failed for chat ${chatId}: ${String(err)}`,
689+
);
690+
});
691+
}
692+
return delivered;
693+
}
694+
if (!answerLane.stream) {
678695
return false;
679696
}
680697
if (streamMode !== "progress" && !streamToolProgressEnabled) {
@@ -1176,6 +1193,16 @@ export const dispatchTelegramMessage = async ({
11761193
}
11771194
return result.delivered;
11781195
};
1196+
sendStandaloneToolProgress = async (text: string) => {
1197+
const result = await (telegramDeps.deliverReplies ?? deliverReplies)({
1198+
...deliveryBaseOptions,
1199+
replies: [{ text }],
1200+
onVoiceRecording: sendRecordVoice,
1201+
silent: false,
1202+
mediaLoader: telegramDeps.loadWebMedia,
1203+
});
1204+
return result.delivered;
1205+
};
11791206
const emitPreviewFinalizedHook = (result: LaneDeliveryResult) => {
11801207
if (isDispatchSuperseded() || result.kind !== "preview-finalized") {
11811208
return;
@@ -1243,6 +1270,7 @@ export const dispatchTelegramMessage = async ({
12431270
resetDraftLaneState(answerLane);
12441271
}
12451272
const delivered = await sendPayload(applyTextToPayload(payload, text), { durable: true });
1273+
streamToolProgressSuppressed = true;
12461274
answerLane.finalized = true;
12471275
return delivered ? { kind: "sent" } : { kind: "skipped" };
12481276
};

src/agents/command/session-store.ts

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,17 @@ import { normalizeOptionalString } from "../../shared/string-coerce.js";
1010
import { clearCliSession, setCliSessionBinding, setCliSessionId } from "../cli-session.js";
1111
import { DEFAULT_CONTEXT_TOKENS } from "../defaults.js";
1212
import { isCliProvider } from "../model-selection.js";
13+
import { resolvePreservedSessionContextTokens } from "../session-context-tokens.js";
1314
import { deriveSessionTotalTokens, hasNonzeroUsage } from "../usage.js";
1415

1516
type RunResult = Awaited<ReturnType<(typeof import("../pi-embedded.js"))["runEmbeddedPiAgent"]>>;
1617

1718
const usageFormatModuleLoader = createLazyImportLoader(() => import("../../utils/usage-format.js"));
18-
const contextModuleLoader = createLazyImportLoader(() => import("../context.js"));
1919

2020
async function getUsageFormatModule() {
2121
return await usageFormatModuleLoader.load();
2222
}
2323

24-
async function getContextModule() {
25-
return await contextModuleLoader.load();
26-
}
27-
2824
function resolveNonNegativeNumber(value: number | undefined): number | undefined {
2925
return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : undefined;
3026
}
@@ -95,25 +91,23 @@ export async function updateSessionStoreAfterAgentRun(params: {
9591
const providerUsed = result.meta.agentMeta?.provider ?? fallbackProvider ?? defaultProvider;
9692
const agentHarnessId = normalizeOptionalString(result.meta.agentMeta?.agentHarnessId);
9793
const runtimeContextTokens = resolvePositiveInteger(result.meta.agentMeta?.contextTokens);
98-
const contextTokens =
99-
runtimeContextTokens !== undefined
100-
? runtimeContextTokens
101-
: typeof params.contextTokensOverride === "number" && params.contextTokensOverride > 0
102-
? params.contextTokensOverride
103-
: ((await getContextModule()).resolveContextTokensForModel({
104-
cfg,
105-
provider: providerUsed,
106-
model: modelUsed,
107-
fallbackContextTokens: DEFAULT_CONTEXT_TOKENS,
108-
allowAsyncLoad: false,
109-
}) ?? DEFAULT_CONTEXT_TOKENS);
11094

11195
const preserveRuntimeModel = params.preserveRuntimeModel === true;
11296
const entry = sessionStore[sessionKey] ?? {
11397
sessionId,
11498
updatedAt: now,
11599
sessionStartedAt: now,
116100
};
101+
const contextTokens = resolvePreservedSessionContextTokens({
102+
cfg,
103+
provider: providerUsed,
104+
model: modelUsed,
105+
runtimeContextTokens,
106+
contextTokensOverride: params.contextTokensOverride,
107+
existingEntry: entry,
108+
fallbackContextTokens: DEFAULT_CONTEXT_TOKENS,
109+
allowAsyncLoad: false,
110+
});
117111
const next: SessionEntry = {
118112
...entry,
119113
sessionId,
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { describe, expect, it } from "vitest";
2+
import type { OpenClawConfig } from "../config/types.openclaw.js";
3+
import { resolvePreservedSessionContextTokens } from "./session-context-tokens.js";
4+
5+
describe("resolvePreservedSessionContextTokens", () => {
6+
it("preserves an existing larger session context over smaller runtime metadata", () => {
7+
expect(
8+
resolvePreservedSessionContextTokens({
9+
cfg: { agents: { defaults: { contextTokens: 1_050_000 } } } as OpenClawConfig,
10+
provider: "openai-codex",
11+
model: "gpt-5.5",
12+
runtimeContextTokens: 272_000,
13+
existingEntry: {
14+
sessionId: "s1",
15+
modelProvider: "openai-codex",
16+
model: "gpt-5.5",
17+
contextTokens: 1_050_000,
18+
updatedAt: 1,
19+
},
20+
fallbackContextTokens: 200_000,
21+
allowAsyncLoad: false,
22+
}),
23+
).toBe(1_050_000);
24+
});
25+
26+
it("uses the largest explicit candidate instead of blindly trusting runtime", () => {
27+
expect(
28+
resolvePreservedSessionContextTokens({
29+
cfg: {} as OpenClawConfig,
30+
runtimeContextTokens: 272_000,
31+
contextTokensOverride: 1_050_000,
32+
existingContextTokens: 200_000,
33+
fallbackContextTokens: 128_000,
34+
allowAsyncLoad: false,
35+
}),
36+
).toBe(1_050_000);
37+
});
38+
});

0 commit comments

Comments
 (0)