Skip to content

Commit 10bbed8

Browse files
committed
fix(telegram): chain over-limit stream previews
1 parent c7cf34a commit 10bbed8

5 files changed

Lines changed: 177 additions & 11 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
2020
- Agents/compaction: keep contributor diagnostics to a bounded top-three selection without sorting the full history. Thanks @shakkernerd.
2121
- Sessions/UI: avoid full-array sorting while selecting ACPX leases, Google Meet calendar events, and latest chat sessions. Thanks @shakkernerd.
2222
- Telegram: preserve the channel-specific 10-option poll cap in the unified outbound adapter so over-limit polls are rejected before send. (#78762) Thanks @obviyus.
23+
- Telegram/streaming: continue over-limit draft previews in a new message instead of stopping when rendered preview text crosses Telegram's message limit. (#74508) Thanks @anagnorisis2peripeteia.
2324
- Slack: route handled top-level channel turns in implicit-conversation channels to thread-scoped sessions when Slack reply threading is enabled, keeping the root turn and later thread replies on one OpenClaw session. (#78522) Thanks @zeroth-blip.
2425
- Telegram: re-probe the primary fetch transport after repeated sticky fallback success so transient IPv4 or pinned-IP fallback promotion can recover without a gateway restart. Fixes #77088. (#77157) Thanks @MkDev11.
2526
- Runtime/install: raise the supported Node 22 floor to `22.16+` so native SQLite query handling can rely on the `node:sqlite` statement metadata API while continuing to recommend Node 24. (#78921)

extensions/telegram/src/bot-message-dispatch.test.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,38 @@ describe("dispatchTelegramMessage draft streaming", () => {
468468
expect(draftStream.clear).toHaveBeenCalledTimes(1);
469469
});
470470

471+
it("keeps retained overflow draft previews", async () => {
472+
const draftStream = createDraftStream();
473+
const bot = createBot();
474+
createTelegramDraftStream.mockReturnValue(draftStream);
475+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
476+
async ({ dispatcherOptions, replyOptions }) => {
477+
await replyOptions?.onPartialReply?.({ text: "Hello" });
478+
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
479+
return { queuedFinal: true };
480+
},
481+
);
482+
deliverReplies.mockResolvedValue({ delivered: true });
483+
484+
await dispatchWithContext({ context: createContext(), bot });
485+
486+
const streamParams = createTelegramDraftStream.mock.calls[0]?.[0] as Parameters<
487+
NonNullable<TelegramBotDeps["createTelegramDraftStream"]>
488+
>[0];
489+
streamParams.onSupersededPreview?.({
490+
messageId: 17,
491+
textSnapshot: "first page",
492+
retain: true,
493+
});
494+
expect(bot.api.deleteMessage).not.toHaveBeenCalled();
495+
496+
streamParams.onSupersededPreview?.({
497+
messageId: 18,
498+
textSnapshot: "stale page",
499+
});
500+
await vi.waitFor(() => expect(bot.api.deleteMessage).toHaveBeenCalledWith(123, 18));
501+
});
502+
471503
it("queues final Telegram replies through outbound delivery when available", async () => {
472504
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
473505
status: "handled_visible",

extensions/telegram/src/bot-message-dispatch.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,9 @@ export const dispatchTelegramMessage = async ({
442442
minInitialChars: draftMinInitialChars,
443443
renderText: renderStreamText,
444444
onSupersededPreview: (superseded) => {
445+
if (superseded.retain) {
446+
return;
447+
}
445448
void bot.api.deleteMessage(chatId, superseded.messageId).catch((err: unknown) => {
446449
logVerbose(
447450
`telegram: superseded ${laneName} stream cleanup failed (${superseded.messageId}): ${String(err)}`,

extensions/telegram/src/draft-stream.test.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,63 @@ describe("createTelegramDraftStream", () => {
389389
});
390390
});
391391

392+
it("continues in a new message when rendered preview crosses maxChars", async () => {
393+
const api = createMockDraftApi();
394+
api.sendMessage
395+
.mockResolvedValueOnce({ message_id: 17 })
396+
.mockResolvedValueOnce({ message_id: 42 });
397+
const stream = createDraftStream(api, { maxChars: 20 });
398+
399+
stream.update("Hello world");
400+
await stream.flush();
401+
stream.update("Hello world foo bar baz qux");
402+
await stream.flush();
403+
404+
expect(api.sendMessage).toHaveBeenCalledTimes(2);
405+
expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "Hello world", undefined);
406+
expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "foo bar baz qux", undefined);
407+
});
408+
409+
it("splits a first oversized rendered preview into chained messages", async () => {
410+
const api = createMockDraftApi();
411+
api.sendMessage
412+
.mockResolvedValueOnce({ message_id: 17 })
413+
.mockResolvedValueOnce({ message_id: 42 });
414+
const stream = createDraftStream(api, { maxChars: 10 });
415+
416+
stream.update("1234567890ABCDEFGHIJ");
417+
await stream.flush();
418+
419+
expect(api.sendMessage).toHaveBeenCalledTimes(2);
420+
expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "1234567890", undefined);
421+
expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "ABCDEFGHIJ", undefined);
422+
});
423+
424+
it("retains overflow preview pages", async () => {
425+
const api = createMockDraftApi();
426+
api.sendMessage
427+
.mockResolvedValueOnce({ message_id: 17 })
428+
.mockResolvedValueOnce({ message_id: 42 });
429+
const onSupersededPreview = vi.fn();
430+
const stream = createDraftStream(api, {
431+
maxChars: 20,
432+
onSupersededPreview,
433+
});
434+
435+
stream.update("Hello world");
436+
await stream.flush();
437+
stream.update("Hello world foo bar baz qux");
438+
await stream.flush();
439+
440+
expect(onSupersededPreview).toHaveBeenCalledWith({
441+
messageId: 17,
442+
textSnapshot: "Hello world",
443+
parseMode: undefined,
444+
visibleSinceMs: expect.any(Number),
445+
retain: true,
446+
});
447+
});
448+
392449
it("enforces maxChars after renderText expansion", async () => {
393450
const api = createMockDraftApi();
394451
const warn = vi.fn();

extensions/telegram/src/draft-stream.ts

Lines changed: 84 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,38 @@ type SupersededTelegramPreview = {
5353
textSnapshot: string;
5454
parseMode?: "HTML";
5555
visibleSinceMs?: number;
56+
retain?: boolean;
5657
};
5758

59+
function renderTelegramDraftPreview(
60+
text: string,
61+
renderText: ((text: string) => TelegramDraftPreview) | undefined,
62+
): TelegramDraftPreview {
63+
const trimmed = text.trimEnd();
64+
return renderText?.(trimmed) ?? { text: trimmed };
65+
}
66+
67+
function findTelegramDraftChunkLength(
68+
text: string,
69+
maxChars: number,
70+
renderText: ((text: string) => TelegramDraftPreview) | undefined,
71+
): number {
72+
let best = 0;
73+
let low = 1;
74+
let high = text.length;
75+
while (low <= high) {
76+
const mid = Math.floor((low + high) / 2);
77+
const renderedText = renderTelegramDraftPreview(text.slice(0, mid), renderText).text.trimEnd();
78+
if (renderedText && renderedText.length <= maxChars) {
79+
best = mid;
80+
low = mid + 1;
81+
} else {
82+
high = mid - 1;
83+
}
84+
}
85+
return best;
86+
}
87+
5888
export function createTelegramDraftStream(params: {
5989
api: Bot["api"];
6090
chatId: Parameters<Bot["api"]["sendMessage"]>[0];
@@ -98,6 +128,8 @@ export function createTelegramDraftStream(params: {
98128
let lastSentParseMode: "HTML" | undefined;
99129
let previewRevision = 0;
100130
let generation = 0;
131+
let deliveredTextOffset = 0;
132+
let resetStreamToNewMessage: (options?: { keepPending?: boolean; resetOffset?: boolean }) => void;
101133
type PreviewSendParams = {
102134
renderedText: string;
103135
renderedParseMode: "HTML" | undefined;
@@ -198,13 +230,45 @@ export function createTelegramDraftStream(params: {
198230
if (!trimmed) {
199231
return false;
200232
}
201-
const rendered = params.renderText?.(trimmed) ?? { text: trimmed };
233+
const currentText = trimmed.slice(deliveredTextOffset).trimStart();
234+
if (!currentText) {
235+
return false;
236+
}
237+
const rendered = renderTelegramDraftPreview(currentText, params.renderText);
202238
const renderedText = rendered.text.trimEnd();
203239
const renderedParseMode = rendered.parseMode;
204240
if (!renderedText) {
205241
return false;
206242
}
207243
if (renderedText.length > maxChars) {
244+
if (lastDeliveredText.length > deliveredTextOffset) {
245+
const supersededMessageId = streamMessageId;
246+
const supersededTextSnapshot = lastSentText;
247+
const supersededParseMode = lastSentParseMode;
248+
const supersededVisibleSinceMs = streamVisibleSinceMs;
249+
deliveredTextOffset = lastDeliveredText.length;
250+
resetStreamToNewMessage({ keepPending: true, resetOffset: false });
251+
if (typeof supersededMessageId === "number") {
252+
params.onSupersededPreview?.({
253+
messageId: supersededMessageId,
254+
textSnapshot: supersededTextSnapshot,
255+
parseMode: supersededParseMode,
256+
visibleSinceMs: supersededVisibleSinceMs,
257+
retain: true,
258+
});
259+
}
260+
return await sendOrEditStreamMessage(trimmed);
261+
}
262+
const chunkLength = findTelegramDraftChunkLength(currentText, maxChars, params.renderText);
263+
if (chunkLength > 0) {
264+
const sent = await sendOrEditStreamMessage(
265+
trimmed.slice(0, deliveredTextOffset) + currentText.slice(0, chunkLength),
266+
);
267+
if (!sent) {
268+
return false;
269+
}
270+
return await sendOrEditStreamMessage(trimmed);
271+
}
208272
streamState.stopped = true;
209273
params.warn?.(
210274
`telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`,
@@ -248,6 +312,24 @@ export function createTelegramDraftStream(params: {
248312
sendOrEditStreamMessage,
249313
});
250314

315+
resetStreamToNewMessage = (options) => {
316+
streamState.stopped = false;
317+
streamState.final = false;
318+
generation += 1;
319+
messageSendAttempted = false;
320+
streamMessageId = undefined;
321+
streamVisibleSinceMs = undefined;
322+
lastSentText = "";
323+
lastSentParseMode = undefined;
324+
if (options?.resetOffset !== false) {
325+
deliveredTextOffset = 0;
326+
}
327+
if (!options?.keepPending) {
328+
loop.resetPending();
329+
}
330+
loop.resetThrottleWindow();
331+
};
332+
251333
const clear = async () => {
252334
const messageId = await takeMessageIdAfterStop({
253335
stopForClear,
@@ -272,16 +354,7 @@ export function createTelegramDraftStream(params: {
272354
};
273355

274356
const forceNewMessage = () => {
275-
streamState.stopped = false;
276-
streamState.final = false;
277-
generation += 1;
278-
messageSendAttempted = false;
279-
streamMessageId = undefined;
280-
streamVisibleSinceMs = undefined;
281-
lastSentText = "";
282-
lastSentParseMode = undefined;
283-
loop.resetPending();
284-
loop.resetThrottleWindow();
357+
resetStreamToNewMessage();
285358
};
286359

287360
const materialize = async (): Promise<number | undefined> => {

0 commit comments

Comments
 (0)