Skip to content

Commit 575bd77

Browse files
committed
fix: stabilize telegram draft boundary previews (openclaw#33842) (thanks @ngutman)
1 parent 5ce5309 commit 575bd77

8 files changed

Lines changed: 459 additions & 69 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai
3838
- Discord/voice messages: request upload slots with JSON fetch calls so voice message uploads no longer fail with content-type errors. Thanks @thewilloftheshadow.
3939
- Discord/voice decoder fallback: drop the native Opus dependency and use opusscript for voice decoding to avoid native-opus installs. Thanks @thewilloftheshadow.
4040
- Discord/auto presence health signal: add runtime availability-driven presence updates plus connected-state reporting to improve health monitoring and operator visibility. (#33277) Thanks @thewilloftheshadow.
41+
- Telegram/draft-stream boundary stability: materialize DM draft previews at assistant-message/tool boundaries, serialize lane-boundary callbacks before final delivery, and scope preview cleanup to the active preview so multi-step Telegram streams no longer lose, overwrite, or leave stale preview bubbles. (#33842) Thanks @ngutman.
4142
- Telegram/DM draft finalization reliability: require verified final-text draft emission before treating preview finalization as delivered, and fall back to normal payload send when final draft delivery is not confirmed (preventing missing final responses and preserving media/button delivery). (#32118) Thanks @OpenCils.
4243
- Telegram/draft preview boundary + silent-token reliability: stabilize answer-lane message boundaries across late-partial/message-start races, preserve/reset finalized preview state at the correct boundaries, and suppress `NO_REPLY` lead-fragment leaks without broad heartbeat-prefix false positives. (#33169) Thanks @obviyus.
4344
- Discord/audit wildcard warnings: ignore "\*" wildcard keys when counting unresolved guild channels so doctor/status no longer warns on allow-all configs. (#33125) Thanks @thewilloftheshadow.

src/telegram/bot-message-dispatch.test.ts

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,178 @@ describe("dispatchTelegramMessage draft streaming", () => {
422422
},
423423
);
424424

425+
it("materializes boundary preview and keeps it when no matching final arrives", async () => {
426+
const answerDraftStream = createDraftStream(999);
427+
answerDraftStream.materialize.mockResolvedValue(4321);
428+
const reasoningDraftStream = createDraftStream();
429+
createTelegramDraftStream
430+
.mockImplementationOnce(() => answerDraftStream)
431+
.mockImplementationOnce(() => reasoningDraftStream);
432+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
433+
await replyOptions?.onPartialReply?.({ text: "Before tool boundary" });
434+
await replyOptions?.onAssistantMessageStart?.();
435+
return { queuedFinal: false };
436+
});
437+
438+
const bot = createBot();
439+
await dispatchWithContext({ context: createContext(), streamMode: "partial", bot });
440+
441+
expect(answerDraftStream.materialize).toHaveBeenCalledTimes(1);
442+
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
443+
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
444+
const deleteMessageCalls = (
445+
bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } }
446+
).deleteMessage.mock.calls;
447+
expect(deleteMessageCalls).not.toContainEqual([123, 4321]);
448+
});
449+
450+
it("waits for queued boundary rotation before final lane delivery", async () => {
451+
const answerDraftStream = createSequencedDraftStream(1001);
452+
let resolveMaterialize: ((value: number | undefined) => void) | undefined;
453+
const materializePromise = new Promise<number | undefined>((resolve) => {
454+
resolveMaterialize = resolve;
455+
});
456+
answerDraftStream.materialize.mockImplementation(() => materializePromise);
457+
const reasoningDraftStream = createDraftStream();
458+
createTelegramDraftStream
459+
.mockImplementationOnce(() => answerDraftStream)
460+
.mockImplementationOnce(() => reasoningDraftStream);
461+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
462+
async ({ dispatcherOptions, replyOptions }) => {
463+
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
464+
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
465+
const startPromise = replyOptions?.onAssistantMessageStart?.();
466+
const finalPromise = dispatcherOptions.deliver(
467+
{ text: "Message B final" },
468+
{ kind: "final" },
469+
);
470+
resolveMaterialize?.(1001);
471+
await startPromise;
472+
await finalPromise;
473+
return { queuedFinal: true };
474+
},
475+
);
476+
deliverReplies.mockResolvedValue({ delivered: true });
477+
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
478+
479+
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
480+
481+
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
482+
expect(editMessageTelegram).toHaveBeenCalledTimes(2);
483+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
484+
2,
485+
123,
486+
1002,
487+
"Message B final",
488+
expect.any(Object),
489+
);
490+
});
491+
492+
it("clears active preview even when an unrelated boundary archive exists", async () => {
493+
const answerDraftStream = createDraftStream(999);
494+
answerDraftStream.materialize.mockResolvedValue(4321);
495+
answerDraftStream.forceNewMessage.mockImplementation(() => {
496+
answerDraftStream.setMessageId(5555);
497+
});
498+
const reasoningDraftStream = createDraftStream();
499+
createTelegramDraftStream
500+
.mockImplementationOnce(() => answerDraftStream)
501+
.mockImplementationOnce(() => reasoningDraftStream);
502+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
503+
await replyOptions?.onPartialReply?.({ text: "Before tool boundary" });
504+
await replyOptions?.onAssistantMessageStart?.();
505+
await replyOptions?.onPartialReply?.({ text: "Unfinalized next preview" });
506+
return { queuedFinal: false };
507+
});
508+
509+
const bot = createBot();
510+
await dispatchWithContext({ context: createContext(), streamMode: "partial", bot });
511+
512+
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
513+
const deleteMessageCalls = (
514+
bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } }
515+
).deleteMessage.mock.calls;
516+
expect(deleteMessageCalls).not.toContainEqual([123, 4321]);
517+
});
518+
519+
it("queues late partials behind async boundary materialization", async () => {
520+
const answerDraftStream = createDraftStream(999);
521+
let resolveMaterialize: ((value: number | undefined) => void) | undefined;
522+
const materializePromise = new Promise<number | undefined>((resolve) => {
523+
resolveMaterialize = resolve;
524+
});
525+
answerDraftStream.materialize.mockImplementation(() => materializePromise);
526+
const reasoningDraftStream = createDraftStream();
527+
createTelegramDraftStream
528+
.mockImplementationOnce(() => answerDraftStream)
529+
.mockImplementationOnce(() => reasoningDraftStream);
530+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => {
531+
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
532+
533+
// Simulate provider fire-and-forget ordering: boundary callback starts
534+
// and a new partial arrives before boundary materialization resolves.
535+
const startPromise = replyOptions?.onAssistantMessageStart?.();
536+
const nextPartialPromise = replyOptions?.onPartialReply?.({ text: "Message B early" });
537+
538+
expect(answerDraftStream.update).toHaveBeenCalledTimes(1);
539+
resolveMaterialize?.(4321);
540+
541+
await startPromise;
542+
await nextPartialPromise;
543+
return { queuedFinal: false };
544+
});
545+
546+
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
547+
548+
expect(answerDraftStream.materialize).toHaveBeenCalledTimes(1);
549+
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
550+
expect(answerDraftStream.update).toHaveBeenCalledTimes(2);
551+
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B early");
552+
const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
553+
const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
554+
expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder);
555+
});
556+
557+
it("keeps final-only preview lane finalized until a real boundary rotation happens", async () => {
558+
const answerDraftStream = createSequencedDraftStream(1001);
559+
const reasoningDraftStream = createDraftStream();
560+
createTelegramDraftStream
561+
.mockImplementationOnce(() => answerDraftStream)
562+
.mockImplementationOnce(() => reasoningDraftStream);
563+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
564+
async ({ dispatcherOptions, replyOptions }) => {
565+
// Final-only first response (no streamed partials).
566+
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
567+
// Simulate provider ordering bug: first chunk arrives before message-start callback.
568+
await replyOptions?.onPartialReply?.({ text: "Message B early" });
569+
await replyOptions?.onAssistantMessageStart?.();
570+
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
571+
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
572+
return { queuedFinal: true };
573+
},
574+
);
575+
deliverReplies.mockResolvedValue({ delivered: true });
576+
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
577+
578+
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
579+
580+
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
581+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
582+
1,
583+
123,
584+
1001,
585+
"Message A final",
586+
expect.any(Object),
587+
);
588+
expect(editMessageTelegram).toHaveBeenNthCalledWith(
589+
2,
590+
123,
591+
1002,
592+
"Message B final",
593+
expect.any(Object),
594+
);
595+
});
596+
425597
it("does not force new message on first assistant message start", async () => {
426598
const draftStream = createDraftStream(999);
427599
createTelegramDraftStream.mockReturnValue(draftStream);
@@ -829,6 +1001,32 @@ describe("dispatchTelegramMessage draft streaming", () => {
8291001
},
8301002
);
8311003

1004+
it("queues reasoning-end split decisions behind queued reasoning deltas", async () => {
1005+
const { reasoningDraftStream } = setupDraftStreams({
1006+
answerMessageId: 999,
1007+
reasoningMessageId: 111,
1008+
});
1009+
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
1010+
async ({ dispatcherOptions, replyOptions }) => {
1011+
// Simulate fire-and-forget upstream ordering: reasoning_end arrives
1012+
// before the queued reasoning delta callback has finished.
1013+
const firstReasoningPromise = replyOptions?.onReasoningStream?.({
1014+
text: "Reasoning:\n_first block_",
1015+
});
1016+
await replyOptions?.onReasoningEnd?.();
1017+
await firstReasoningPromise;
1018+
await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_second block_" });
1019+
await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" });
1020+
return { queuedFinal: true };
1021+
},
1022+
);
1023+
deliverReplies.mockResolvedValue({ delivered: true });
1024+
1025+
await dispatchWithContext({ context: createReasoningStreamContext(), streamMode: "partial" });
1026+
1027+
expect(reasoningDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
1028+
});
1029+
8321030
it("cleans superseded reasoning previews after lane rotation", async () => {
8331031
let reasoningDraftParams:
8341032
| {

0 commit comments

Comments
 (0)