Skip to content

Commit 9297c20

Browse files
committed
fix(telegram): use SDK dispatch replay dedupe
1 parent b2dec9c commit 9297c20

6 files changed

Lines changed: 331 additions & 733 deletions

File tree

extensions/telegram/src/bot-handlers.runtime.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ export const registerTelegramHandlers = ({
213213
scope: resolveTelegramMessageCacheScope(telegramDeps.resolveStorePath(cfg.session?.store)),
214214
});
215215
const messageDispatchReplayGuard = createTelegramMessageDispatchReplayGuard({
216-
storePath: telegramDeps.resolveStorePath(cfg.session?.store),
217216
onDiskError: (error) => {
218217
runtime.error?.(danger(`[telegram] message dispatch dedupe store failed: ${String(error)}`));
219218
},

extensions/telegram/src/bot.create-telegram-bot.test.ts

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
// Telegram tests cover bot.create telegram bot plugin behavior.
2+
import { mkdtempSync, rmSync } from "node:fs";
3+
import { tmpdir } from "node:os";
4+
import path from "node:path";
25
import { escapeRegExp, formatEnvelopeTimestamp } from "openclaw/plugin-sdk/channel-test-helpers";
36
import type { TelegramGroupConfig } from "openclaw/plugin-sdk/config-contracts";
47
import type { GetReplyOptions, MsgContext } from "openclaw/plugin-sdk/reply-runtime";
@@ -14,6 +17,8 @@ const conversationRuntime = await import("openclaw/plugin-sdk/conversation-runti
1417
const configMutation = await import("openclaw/plugin-sdk/config-mutation");
1518
const sessionStoreRuntime = await import("openclaw/plugin-sdk/session-store-runtime");
1619
const EYES_EMOJI = "\u{1F440}";
20+
const tempStateDirs: string[] = [];
21+
let previousStateDir: string | undefined;
1722
const {
1823
answerCallbackQuerySpy,
1924
botCtorSpy,
@@ -60,7 +65,6 @@ const {
6065
} = await import("./bot-core.js");
6166
const { resolveTelegramConversationRoute } = await import("./conversation-route.js");
6267
const { clearAccountThrottlersForTest } = await import("./account-throttler.js");
63-
const messageDispatchDedupe = await import("./message-dispatch-dedupe.js");
6468
const {
6569
buildTelegramGroupFrom,
6670
buildTelegramThreadParams,
@@ -74,6 +78,12 @@ let createTelegramBot: (
7478
opts: TelegramBotOptions,
7579
) => ReturnType<typeof import("./bot-core.js").createTelegramBotCore>;
7680

81+
function createTelegramBotTestStateDir(): string {
82+
const dir = mkdtempSync(path.join(tmpdir(), "openclaw-telegram-bot-"));
83+
tempStateDirs.push(dir);
84+
return dir;
85+
}
86+
7787
const loadConfig = getLoadConfigMock();
7888
const loadSessionStore = getLoadSessionStoreMock();
7989
const loadWebMedia = getLoadWebMediaMock();
@@ -214,10 +224,19 @@ describe("createTelegramBot", () => {
214224
}
215225
});
216226
afterEach(() => {
217-
messageDispatchDedupe.setTelegramMessageDispatchDedupeStoreForTest(undefined);
218227
pluginStateTestRuntime.resetPluginStateStoreForTests();
228+
if (previousStateDir === undefined) {
229+
delete process.env.OPENCLAW_STATE_DIR;
230+
} else {
231+
process.env.OPENCLAW_STATE_DIR = previousStateDir;
232+
}
233+
for (const dir of tempStateDirs.splice(0)) {
234+
rmSync(dir, { recursive: true, force: true });
235+
}
219236
});
220237
beforeEach(async () => {
238+
previousStateDir = process.env.OPENCLAW_STATE_DIR;
239+
process.env.OPENCLAW_STATE_DIR = createTelegramBotTestStateDir();
221240
resetTelegramForumFlagCacheForTest();
222241
clearAccountThrottlersForTest();
223242
throttlerSpy.mockReset();
@@ -230,14 +249,6 @@ describe("createTelegramBot", () => {
230249
telegramDeps: telegramBotDepsForTest,
231250
});
232251
pluginStateTestRuntime.resetPluginStateStoreForTests({ closeDatabase: false });
233-
const store = pluginStateTestRuntime.createPluginStateKeyedStoreForTests("telegram", {
234-
namespace: messageDispatchDedupe.TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
235-
maxEntries: messageDispatchDedupe.TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
236-
}) as NonNullable<
237-
Parameters<typeof messageDispatchDedupe.setTelegramMessageDispatchDedupeStoreForTest>[0]
238-
>;
239-
await store.clear();
240-
messageDispatchDedupe.setTelegramMessageDispatchDedupeStoreForTest(store);
241252
});
242253

243254
// groupPolicy tests

extensions/telegram/src/message-dispatch-dedupe.test.ts

Lines changed: 19 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,23 @@ import { mkdtempSync, rmSync } from "node:fs";
33
import { tmpdir } from "node:os";
44
import path from "node:path";
55
import type { Message } from "grammy/types";
6-
import {
7-
createPluginStateKeyedStoreForTests,
8-
createPluginStateSyncKeyedStoreForTests,
9-
resetPluginStateStoreForTests,
10-
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
6+
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime";
117
import { afterEach, beforeEach, describe, expect, it } from "vitest";
128
import {
13-
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
14-
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
159
buildTelegramMessageDispatchReplayKey,
1610
claimTelegramMessageDispatchReplay,
1711
commitTelegramMessageDispatchReplay,
1812
createTelegramMessageDispatchReplayGuard,
1913
releaseTelegramMessageDispatchReplay,
20-
setTelegramMessageDispatchDedupeStoreForTest,
2114
} from "./message-dispatch-dedupe.js";
2215

23-
type MessageDispatchDedupeStore = NonNullable<
24-
Parameters<typeof setTelegramMessageDispatchDedupeStoreForTest>[0]
25-
>;
26-
type SyncMessageDispatchDedupeStore = Extract<MessageDispatchDedupeStore, { entries(): unknown[] }>;
27-
2816
const tempDirs: string[] = [];
17+
let previousStateDir: string | undefined;
2918

30-
function createStorePath(): string {
19+
function createStateDir(): string {
3120
const dir = mkdtempSync(path.join(tmpdir(), "openclaw-telegram-dispatch-dedupe-"));
3221
tempDirs.push(dir);
33-
return path.join(dir, "sessions.json");
22+
return dir;
3423
}
3524

3625
function message(params?: { chatId?: number; messageId?: number }): Message {
@@ -41,19 +30,19 @@ function message(params?: { chatId?: number; messageId?: number }): Message {
4130
} as Message;
4231
}
4332

44-
beforeEach(async () => {
33+
beforeEach(() => {
34+
previousStateDir = process.env.OPENCLAW_STATE_DIR;
35+
process.env.OPENCLAW_STATE_DIR = createStateDir();
4536
resetPluginStateStoreForTests({ closeDatabase: false });
46-
const store = createPluginStateKeyedStoreForTests("telegram", {
47-
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
48-
maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
49-
}) as NonNullable<Parameters<typeof setTelegramMessageDispatchDedupeStoreForTest>[0]>;
50-
await store.clear();
51-
setTelegramMessageDispatchDedupeStoreForTest(store);
5237
});
5338

5439
afterEach(() => {
55-
setTelegramMessageDispatchDedupeStoreForTest(undefined);
5640
resetPluginStateStoreForTests();
41+
if (previousStateDir === undefined) {
42+
delete process.env.OPENCLAW_STATE_DIR;
43+
} else {
44+
process.env.OPENCLAW_STATE_DIR = previousStateDir;
45+
}
5746
for (const dir of tempDirs.splice(0)) {
5847
rmSync(dir, { recursive: true, force: true });
5948
}
@@ -68,8 +57,7 @@ describe("Telegram message dispatch replay guard", () => {
6857
});
6958

7059
it("persists committed dispatches across guard recreation", async () => {
71-
const storePath = createStorePath();
72-
const writer = createTelegramMessageDispatchReplayGuard({ storePath });
60+
const writer = createTelegramMessageDispatchReplayGuard();
7361
const first = await claimTelegramMessageDispatchReplay({
7462
guard: writer,
7563
accountId: "default",
@@ -89,7 +77,7 @@ describe("Telegram message dispatch replay guard", () => {
8977
keys: [first.key],
9078
});
9179

92-
const reader = createTelegramMessageDispatchReplayGuard({ storePath });
80+
const reader = createTelegramMessageDispatchReplayGuard();
9381
await expect(
9482
claimTelegramMessageDispatchReplay({
9583
guard: reader,
@@ -99,9 +87,8 @@ describe("Telegram message dispatch replay guard", () => {
9987
).resolves.toEqual({ kind: "duplicate" });
10088
});
10189

102-
it("preserves concurrent commits that share dedupe buckets", async () => {
103-
const storePath = createStorePath();
104-
const writer = createTelegramMessageDispatchReplayGuard({ storePath });
90+
it("preserves concurrent commits", async () => {
91+
const writer = createTelegramMessageDispatchReplayGuard();
10592
const keys = Array.from({ length: 400 }, (_, index) =>
10693
JSON.stringify(["message", "1234", index + 1]),
10794
);
@@ -112,151 +99,12 @@ describe("Telegram message dispatch replay guard", () => {
11299
keys,
113100
});
114101

115-
const reader = createTelegramMessageDispatchReplayGuard({ storePath });
102+
const reader = createTelegramMessageDispatchReplayGuard();
116103
await expect(reader.warmup("default")).resolves.toBe(keys.length);
117104
});
118105

119-
it("falls back to same-process replay protection when plugin-state is unavailable", async () => {
120-
setTelegramMessageDispatchDedupeStoreForTest(undefined);
121-
const errors: unknown[] = [];
122-
const storePath = createStorePath();
123-
const guard = createTelegramMessageDispatchReplayGuard({
124-
storePath,
125-
onDiskError: (error) => errors.push(error),
126-
});
127-
const first = await claimTelegramMessageDispatchReplay({
128-
guard,
129-
accountId: "default",
130-
msg: message(),
131-
});
132-
if (first.kind !== "claimed") {
133-
throw new Error("expected initial claim");
134-
}
135-
136-
await expect(guard.commit(first.key, { namespace: "default" })).resolves.toBe(false);
137-
138-
await expect(
139-
claimTelegramMessageDispatchReplay({
140-
guard,
141-
accountId: "default",
142-
msg: message(),
143-
}),
144-
).resolves.toEqual({ kind: "duplicate" });
145-
await expect(guard.hasRecent(first.key, { namespace: "default" })).resolves.toBe(true);
146-
expect(errors).toEqual([]);
147-
});
148-
149-
it("keeps same-process replay protection when plugin-state commit fails", async () => {
150-
const failingStore = createPluginStateKeyedStoreForTests("telegram", {
151-
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
152-
maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
153-
}) as NonNullable<Parameters<typeof setTelegramMessageDispatchDedupeStoreForTest>[0]>;
154-
setTelegramMessageDispatchDedupeStoreForTest({
155-
...failingStore,
156-
async register() {
157-
throw new Error("state write failed");
158-
},
159-
});
160-
const storePath = createStorePath();
161-
const guard = createTelegramMessageDispatchReplayGuard({ storePath });
162-
const first = await claimTelegramMessageDispatchReplay({
163-
guard,
164-
accountId: "default",
165-
msg: message(),
166-
});
167-
if (first.kind !== "claimed") {
168-
throw new Error("expected initial claim");
169-
}
170-
171-
await expect(guard.commit(first.key, { namespace: "default" })).resolves.toBe(false);
172-
173-
await expect(
174-
claimTelegramMessageDispatchReplay({
175-
guard,
176-
accountId: "default",
177-
msg: message(),
178-
}),
179-
).resolves.toEqual({ kind: "duplicate" });
180-
await expect(guard.hasRecent(first.key, { namespace: "default" })).resolves.toBe(true);
181-
await expect(guard.warmup("default")).resolves.toBe(1);
182-
});
183-
184-
it("keeps same-process replay protection when lookup fails after a successful commit", async () => {
185-
const backingStore = createPluginStateSyncKeyedStoreForTests("telegram", {
186-
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
187-
maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
188-
}) as SyncMessageDispatchDedupeStore;
189-
let failLookup = false;
190-
setTelegramMessageDispatchDedupeStoreForTest({
191-
...backingStore,
192-
lookup(key) {
193-
if (failLookup) {
194-
throw new Error("state read failed");
195-
}
196-
return backingStore.lookup(key);
197-
},
198-
});
199-
const storePath = createStorePath();
200-
const guard = createTelegramMessageDispatchReplayGuard({ storePath });
201-
const first = await claimTelegramMessageDispatchReplay({
202-
guard,
203-
accountId: "default",
204-
msg: message(),
205-
});
206-
if (first.kind !== "claimed") {
207-
throw new Error("expected initial claim");
208-
}
209-
await expect(guard.commit(first.key, { namespace: "default" })).resolves.toBe(true);
210-
211-
failLookup = true;
212-
213-
await expect(
214-
claimTelegramMessageDispatchReplay({
215-
guard,
216-
accountId: "default",
217-
msg: message(),
218-
}),
219-
).resolves.toEqual({ kind: "duplicate" });
220-
});
221-
222-
it("keeps replay histories isolated by session store path", async () => {
223-
const firstStorePath = createStorePath();
224-
const secondStorePath = createStorePath();
225-
const firstGuard = createTelegramMessageDispatchReplayGuard({
226-
storePath: firstStorePath,
227-
});
228-
const first = await claimTelegramMessageDispatchReplay({
229-
guard: firstGuard,
230-
accountId: "default",
231-
msg: message(),
232-
});
233-
if (first.kind !== "claimed") {
234-
throw new Error("expected initial claim");
235-
}
236-
await commitTelegramMessageDispatchReplay({
237-
guard: firstGuard,
238-
accountId: "default",
239-
keys: [first.key],
240-
});
241-
242-
const secondGuard = createTelegramMessageDispatchReplayGuard({
243-
storePath: secondStorePath,
244-
});
245-
await expect(
246-
claimTelegramMessageDispatchReplay({
247-
guard: secondGuard,
248-
accountId: "default",
249-
msg: message(),
250-
}),
251-
).resolves.toEqual({
252-
kind: "claimed",
253-
key: first.key,
254-
});
255-
});
256-
257106
it("keeps accounts isolated and releases retryable pre-dispatch claims", async () => {
258-
const storePath = createStorePath();
259-
const guard = createTelegramMessageDispatchReplayGuard({ storePath });
107+
const guard = createTelegramMessageDispatchReplayGuard();
260108
const first = await claimTelegramMessageDispatchReplay({
261109
guard,
262110
accountId: "default",
@@ -295,8 +143,7 @@ describe("Telegram message dispatch replay guard", () => {
295143
});
296144

297145
it("lets an in-flight duplicate retry after the first claim is released", async () => {
298-
const storePath = createStorePath();
299-
const guard = createTelegramMessageDispatchReplayGuard({ storePath });
146+
const guard = createTelegramMessageDispatchReplayGuard();
300147
const first = await claimTelegramMessageDispatchReplay({
301148
guard,
302149
accountId: "default",

0 commit comments

Comments
 (0)