Skip to content

Commit f318069

Browse files
committed
Persist Telegram replay dedupe at dispatch start
1 parent 8027a04 commit f318069

5 files changed

Lines changed: 113 additions & 4 deletions

File tree

extensions/telegram/src/bot-handlers.runtime.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,6 +1117,7 @@ export const registerTelegramHandlers = ({
11171117
options?: TelegramMessageContextOptions;
11181118
dispatchDedupeKeys?: string[];
11191119
}) => {
1120+
let dispatchDedupeCommitted = false;
11201121
try {
11211122
const replyChainNodes = buildReplyChainForMessage(params.msg);
11221123
const { replyMedia, replyChain } = await resolveReplyMediaForChain(
@@ -1136,14 +1137,20 @@ export const registerTelegramHandlers = ({
11361137
replyMedia,
11371138
replyChain,
11381139
promptContext,
1140+
{
1141+
onDispatchStart: async () => {
1142+
await commitDispatchDedupeKeys(params.dispatchDedupeKeys ?? []);
1143+
dispatchDedupeCommitted = true;
1144+
},
1145+
},
11391146
);
1140-
if (dispatched) {
1141-
await commitDispatchDedupeKeys(params.dispatchDedupeKeys ?? []);
1142-
} else {
1147+
if (!dispatched && !dispatchDedupeCommitted) {
11431148
releaseDispatchDedupeKeys(params.dispatchDedupeKeys ?? []);
11441149
}
11451150
} catch (err) {
1146-
releaseDispatchDedupeKeys(params.dispatchDedupeKeys ?? [], err);
1151+
if (!dispatchDedupeCommitted) {
1152+
releaseDispatchDedupeKeys(params.dispatchDedupeKeys ?? [], err);
1153+
}
11471154
throw err;
11481155
}
11491156
};

extensions/telegram/src/bot-message.test.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ describe("telegram bot message processor", () => {
7272

7373
async function processSampleMessage(
7474
processMessage: ReturnType<typeof createTelegramMessageProcessor>,
75+
lifecycle?: import("./bot-message.js").TelegramMessageProcessorLifecycle,
7576
) {
7677
return await processMessage(
7778
{
@@ -83,6 +84,10 @@ describe("telegram bot message processor", () => {
8384
[],
8485
[],
8586
{},
87+
undefined,
88+
undefined,
89+
undefined,
90+
lifecycle,
8691
);
8792
}
8893

@@ -138,6 +143,40 @@ describe("telegram bot message processor", () => {
138143
);
139144
});
140145

146+
it("runs the dispatch-start lifecycle after context creation and before dispatch", async () => {
147+
const sendTyping = vi.fn().mockResolvedValue(undefined);
148+
const onDispatchStart = vi.fn(async () => undefined);
149+
buildTelegramMessageContext.mockResolvedValue(
150+
createMessageContext({
151+
sendTyping,
152+
}),
153+
);
154+
155+
const processMessage = createTelegramMessageProcessor(baseDeps);
156+
await expect(processSampleMessage(processMessage, { onDispatchStart })).resolves.toBe(true);
157+
158+
expect(sendTyping).toHaveBeenCalledTimes(1);
159+
expect(onDispatchStart).toHaveBeenCalledTimes(1);
160+
expect(dispatchTelegramMessage).toHaveBeenCalledTimes(1);
161+
expect(sendTyping.mock.invocationCallOrder[0]).toBeLessThan(
162+
onDispatchStart.mock.invocationCallOrder[0],
163+
);
164+
expect(onDispatchStart.mock.invocationCallOrder[0]).toBeLessThan(
165+
dispatchTelegramMessage.mock.invocationCallOrder[0],
166+
);
167+
});
168+
169+
it("does not run the dispatch-start lifecycle when no context is produced", async () => {
170+
const onDispatchStart = vi.fn(async () => undefined);
171+
buildTelegramMessageContext.mockResolvedValue(null);
172+
173+
const processMessage = createTelegramMessageProcessor(baseDeps);
174+
await expect(processSampleMessage(processMessage, { onDispatchStart })).resolves.toBe(false);
175+
176+
expect(onDispatchStart).not.toHaveBeenCalled();
177+
expect(dispatchTelegramMessage).not.toHaveBeenCalled();
178+
});
179+
141180
it("does not send early typing cues for room events", async () => {
142181
const sendTyping = vi.fn().mockResolvedValue(undefined);
143182
buildTelegramMessageContext.mockResolvedValue(

extensions/telegram/src/bot-message.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ type TelegramMessageProcessorDeps = Omit<
4747
opts: Pick<TelegramBotOptions, "token">;
4848
};
4949

50+
export type TelegramMessageProcessorLifecycle = {
51+
onDispatchStart?: () => Promise<void> | void;
52+
};
53+
5054
export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDeps) => {
5155
const {
5256
bot,
@@ -104,6 +108,7 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
104108
replyMedia?: TelegramMediaRef[],
105109
replyChain?: TelegramReplyChainEntry[],
106110
promptContext?: TelegramPromptContextEntry[],
111+
lifecycle?: TelegramMessageProcessorLifecycle,
107112
) => {
108113
const ingressReceivedAtMs =
109114
typeof options?.receivedAtMs === "number" && Number.isFinite(options.receivedAtMs)
@@ -171,6 +176,7 @@ export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDep
171176
mediaType: allMedia[0]?.contentType,
172177
}),
173178
);
179+
await lifecycle?.onDispatchStart?.();
174180
try {
175181
await dispatchTelegramMessage({
176182
context,

extensions/telegram/src/bot-native-commands.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ export type RegisterTelegramHandlerParams = {
429429
replyMedia?: TelegramMediaRef[],
430430
replyChain?: import("./message-cache.js").TelegramReplyChainEntry[],
431431
promptContext?: import("./bot-message-context.types.js").TelegramPromptContextEntry[],
432+
lifecycle?: import("./bot-message.js").TelegramMessageProcessorLifecycle,
432433
) => Promise<boolean>;
433434
logger: ReturnType<typeof getChildLogger>;
434435
};

extensions/telegram/src/bot.create-telegram-bot.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,16 @@ function requireValue<T>(value: T | null | undefined, label: string): T {
178178
return value;
179179
}
180180

181+
function createDeferred<T = void>() {
182+
let resolve!: (value: T | PromiseLike<T>) => void;
183+
let reject!: (reason?: unknown) => void;
184+
const promise = new Promise<T>((resolvePromise, rejectPromise) => {
185+
resolve = resolvePromise;
186+
reject = rejectPromise;
187+
});
188+
return { promise, resolve, reject };
189+
}
190+
181191
function requireRecord(value: unknown, label: string): Record<string, unknown> {
182192
if (typeof value !== "object" || value === null || Array.isArray(value)) {
183193
throw new Error(`expected ${label} to be an object`);
@@ -1717,6 +1727,52 @@ describe("createTelegramBot", () => {
17171727
expect(replySpy).toHaveBeenCalledTimes(1);
17181728
});
17191729

1730+
it("dedupes a replayed Telegram message after handler recreation while dispatch is pending", async () => {
1731+
loadConfig.mockReturnValue({
1732+
channels: { telegram: { dmPolicy: "open", allowFrom: ["*"] } },
1733+
});
1734+
1735+
const firstDispatchStarted = createDeferred();
1736+
const finishFirstDispatch = createDeferred();
1737+
replySpy.mockImplementationOnce(async (_ctx: MsgContext, opts?: GetReplyOptions) => {
1738+
await opts?.onReplyStart?.();
1739+
firstDispatchStarted.resolve();
1740+
await finishFirstDispatch.promise;
1741+
return undefined;
1742+
});
1743+
1744+
const replayedCtx = () => ({
1745+
update: { update_id: 8488602 },
1746+
message: {
1747+
chat: { id: 123, type: "private" },
1748+
from: { id: 456, username: "testuser" },
1749+
text: "replay while pending",
1750+
date: 1736380800,
1751+
message_id: 43,
1752+
},
1753+
me: { username: "openclaw_bot" },
1754+
getFile: async () => ({ download: async () => new Uint8Array() }),
1755+
});
1756+
1757+
createTelegramBot({ token: "tok" });
1758+
const firstRun = (getOnHandler("message") as (ctx: Record<string, unknown>) => Promise<void>)(
1759+
replayedCtx(),
1760+
);
1761+
await firstDispatchStarted.promise;
1762+
expect(replySpy).toHaveBeenCalledTimes(1);
1763+
1764+
onSpy.mockClear();
1765+
createTelegramBot({ token: "tok" });
1766+
await (getOnHandler("message") as (ctx: Record<string, unknown>) => Promise<void>)(
1767+
replayedCtx(),
1768+
);
1769+
1770+
expect(replySpy).toHaveBeenCalledTimes(1);
1771+
finishFirstDispatch.resolve();
1772+
await firstRun;
1773+
expect(replySpy).toHaveBeenCalledTimes(1);
1774+
});
1775+
17201776
it("persists update offsets after successful dispatch completion", async () => {
17211777
// For this test we need sequentialize(...) to behave like a normal middleware and call next().
17221778
sequentializeSpy.mockImplementationOnce(

0 commit comments

Comments
 (0)