Skip to content

Commit b95e50d

Browse files
committed
fix(telegram): keep overlapping DM replies deliverable
1 parent a00c583 commit b95e50d

2 files changed

Lines changed: 86 additions & 0 deletions

File tree

extensions/telegram/src/bot-message-dispatch.test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2712,6 +2712,88 @@ describe("dispatchTelegramMessage draft streaming", () => {
27122712
expect(deliveredTexts).toContain("fresh request answer");
27132713
});
27142714

2715+
it("keeps newer DM requests from aborting active same-session dispatch", async () => {
2716+
let firstStarted: (() => void) | undefined;
2717+
const firstStartGate = new Promise<void>((resolve) => {
2718+
firstStarted = resolve;
2719+
});
2720+
let releaseFirst: (() => void) | undefined;
2721+
const firstGate = new Promise<void>((resolve) => {
2722+
releaseFirst = resolve;
2723+
});
2724+
let secondStarted: (() => void) | undefined;
2725+
const secondStartGate = new Promise<void>((resolve) => {
2726+
secondStarted = resolve;
2727+
});
2728+
let firstAbortSignal: AbortSignal | undefined;
2729+
dispatchReplyWithBufferedBlockDispatcher
2730+
.mockImplementationOnce(async ({ dispatcherOptions, replyOptions }) => {
2731+
firstAbortSignal = replyOptions?.abortSignal;
2732+
firstStarted?.();
2733+
await firstGate;
2734+
await dispatcherOptions.deliver({ text: "earlier DM answer" }, { kind: "final" });
2735+
return {
2736+
queuedFinal: true,
2737+
counts: { block: 0, final: 1, tool: 0 },
2738+
};
2739+
})
2740+
.mockImplementationOnce(async ({ dispatcherOptions }) => {
2741+
secondStarted?.();
2742+
await dispatcherOptions.deliver({ text: "fresh DM answer" }, { kind: "final" });
2743+
return {
2744+
queuedFinal: true,
2745+
counts: { block: 0, final: 1, tool: 0 },
2746+
};
2747+
});
2748+
deliverReplies.mockResolvedValue({ delivered: true });
2749+
2750+
const createDirectContext = (messageId: number, body: string) =>
2751+
createContext({
2752+
ctxPayload: {
2753+
SessionKey: "agent:main:main",
2754+
ChatType: "direct",
2755+
MessageSid: String(messageId),
2756+
RawBody: body,
2757+
BodyForAgent: body,
2758+
CommandBody: body,
2759+
CommandAuthorized: true,
2760+
} as unknown as TelegramMessageContext["ctxPayload"],
2761+
msg: {
2762+
chat: { id: 123, type: "private" },
2763+
message_id: messageId,
2764+
} as unknown as TelegramMessageContext["msg"],
2765+
chatId: 123,
2766+
isGroup: false,
2767+
historyKey: "telegram:123",
2768+
historyLimit: 10,
2769+
groupHistories: new Map(),
2770+
threadSpec: { id: undefined, scope: "none" },
2771+
});
2772+
2773+
const firstPromise = dispatchWithContext({
2774+
context: createDirectContext(99, "first request"),
2775+
streamMode: "off",
2776+
});
2777+
await firstStartGate;
2778+
const secondPromise = dispatchWithContext({
2779+
context: createDirectContext(100, "second request"),
2780+
streamMode: "off",
2781+
});
2782+
await secondStartGate;
2783+
2784+
expect(firstAbortSignal?.aborted).toBe(false);
2785+
releaseFirst?.();
2786+
await Promise.all([firstPromise, secondPromise]);
2787+
2788+
const deliveredTexts = deliverReplies.mock.calls.flatMap((call) =>
2789+
((call[0] as { replies?: Array<{ text?: string }> }).replies ?? []).map(
2790+
(reply) => reply.text,
2791+
),
2792+
);
2793+
expect(deliveredTexts).toContain("fresh DM answer");
2794+
expect(deliveredTexts).toContain("earlier DM answer");
2795+
});
2796+
27152797
it("keeps /btw side questions from aborting an active same-session dispatch", async () => {
27162798
const historyKey = "telegram:group:-100123";
27172799
const groupHistories = new Map([[historyKey, []]]);

extensions/telegram/src/telegram-reply-fence.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ export function releaseTelegramReplyFenceAbortController(
192192

193193
export function shouldSupersedeTelegramReplyFence(ctxPayload: {
194194
Body?: string;
195+
ChatType?: string;
195196
RawBody?: string;
196197
CommandBody?: string;
197198
CommandAuthorized: boolean;
@@ -206,6 +207,9 @@ export function shouldSupersedeTelegramReplyFence(ctxPayload: {
206207
) {
207208
return false;
208209
}
210+
if (ctxPayload.ChatType === "direct") {
211+
return false;
212+
}
209213
return true;
210214
}
211215

0 commit comments

Comments
 (0)