Skip to content

Commit fa61b16

Browse files
committed
fix(telegram): bound dispatch dedupe state
1 parent 596f03c commit fa61b16

5 files changed

Lines changed: 156 additions & 52 deletions

File tree

extensions/telegram/src/bot-handlers.runtime.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,15 +286,13 @@ export const registerTelegramHandlers = ({
286286
const releaseDispatchDedupeKeys = (keys: readonly string[], error?: unknown) => {
287287
releaseTelegramMessageDispatchReplay({
288288
guard: messageDispatchReplayGuard,
289-
accountId,
290289
keys,
291290
error,
292291
});
293292
};
294293
const commitDispatchDedupeKeys = async (keys: readonly string[]) => {
295294
await commitTelegramMessageDispatchReplay({
296295
guard: messageDispatchReplayGuard,
297-
accountId,
298296
keys,
299297
});
300298
};

extensions/telegram/src/message-dispatch-dedupe.test.ts

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ import type { Message } from "grammy/types";
66
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime";
77
import { afterEach, beforeEach, describe, expect, it } from "vitest";
88
import {
9+
buildTelegramMessageDispatchAccountReplayKey,
910
buildTelegramMessageDispatchReplayKey,
1011
claimTelegramMessageDispatchReplay,
1112
commitTelegramMessageDispatchReplay,
1213
createTelegramMessageDispatchReplayGuard,
1314
releaseTelegramMessageDispatchReplay,
15+
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
1416
} from "./message-dispatch-dedupe.js";
1517

1618
const tempDirs: string[] = [];
@@ -30,6 +32,14 @@ function message(params?: { chatId?: number; messageId?: number }): Message {
3032
} as Message;
3133
}
3234

35+
function storedReplayKey(accountId: string, msg: Message): string {
36+
const key = buildTelegramMessageDispatchReplayKey(msg);
37+
if (!key) {
38+
throw new Error("expected replay key");
39+
}
40+
return buildTelegramMessageDispatchAccountReplayKey({ accountId, key });
41+
}
42+
3343
beforeEach(() => {
3444
previousStateDir = process.env.OPENCLAW_STATE_DIR;
3545
process.env.OPENCLAW_STATE_DIR = createStateDir();
@@ -66,14 +76,13 @@ describe("Telegram message dispatch replay guard", () => {
6676

6777
expect(first).toEqual({
6878
kind: "claimed",
69-
key: JSON.stringify(["message", "1234", 42]),
79+
key: storedReplayKey("default", message()),
7080
});
7181
if (first.kind !== "claimed") {
7282
throw new Error("expected initial claim");
7383
}
7484
await commitTelegramMessageDispatchReplay({
7585
guard: writer,
76-
accountId: "default",
7786
keys: [first.key],
7887
});
7988

@@ -90,17 +99,44 @@ describe("Telegram message dispatch replay guard", () => {
9099
it("preserves concurrent commits", async () => {
91100
const writer = createTelegramMessageDispatchReplayGuard();
92101
const keys = Array.from({ length: 400 }, (_, index) =>
93-
JSON.stringify(["message", "1234", index + 1]),
102+
storedReplayKey("default", message({ messageId: index + 1 })),
94103
);
95104

96105
await commitTelegramMessageDispatchReplay({
97106
guard: writer,
98-
accountId: "default",
99107
keys,
100108
});
101109

102110
const reader = createTelegramMessageDispatchReplayGuard();
103-
await expect(reader.warmup("default")).resolves.toBe(keys.length);
111+
await expect(reader.warmup(TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE)).resolves.toBe(
112+
keys.length,
113+
);
114+
});
115+
116+
it("uses one persisted namespace across Telegram accounts", async () => {
117+
const writer = createTelegramMessageDispatchReplayGuard();
118+
const first = await claimTelegramMessageDispatchReplay({
119+
guard: writer,
120+
accountId: "default",
121+
msg: message(),
122+
});
123+
const second = await claimTelegramMessageDispatchReplay({
124+
guard: writer,
125+
accountId: "work",
126+
msg: message(),
127+
});
128+
if (first.kind !== "claimed" || second.kind !== "claimed") {
129+
throw new Error("expected account claims");
130+
}
131+
132+
await commitTelegramMessageDispatchReplay({
133+
guard: writer,
134+
keys: [first.key, second.key],
135+
});
136+
137+
const reader = createTelegramMessageDispatchReplayGuard();
138+
await expect(reader.warmup(TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE)).resolves.toBe(2);
139+
await expect(reader.warmup("default")).resolves.toBe(0);
104140
});
105141

106142
it("keeps accounts isolated and releases retryable pre-dispatch claims", async () => {
@@ -122,12 +158,11 @@ describe("Telegram message dispatch replay guard", () => {
122158
}),
123159
).resolves.toEqual({
124160
kind: "claimed",
125-
key: first.key,
161+
key: storedReplayKey("work", message()),
126162
});
127163

128164
releaseTelegramMessageDispatchReplay({
129165
guard,
130-
accountId: "default",
131166
keys: [first.key],
132167
});
133168
await expect(
@@ -160,7 +195,6 @@ describe("Telegram message dispatch replay guard", () => {
160195
});
161196
releaseTelegramMessageDispatchReplay({
162197
guard,
163-
accountId: "default",
164198
keys: [first.key],
165199
error: new Error("retry"),
166200
});

extensions/telegram/src/message-dispatch-dedupe.ts

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk
55
import { normalizeStringEntries, uniqueStrings } from "openclaw/plugin-sdk/string-coerce-runtime";
66

77
export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_TTL_MS = 7 * 24 * 60 * 60 * 1000;
8+
export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE = "global";
89
export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE_PREFIX = "telegram.message-dispatch-dedupe";
9-
export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES = 50_000;
10+
export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_STATE_PLUGIN_ID = "telegram-message-dispatch-dedupe";
11+
export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MEMORY_MAX_ENTRIES = 50_000;
12+
export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_STATE_MAX_ENTRIES = 50_000;
1013

1114
export type TelegramMessageDispatchReplayGuard = ClaimableDedupe;
1215

@@ -44,17 +47,34 @@ export function buildTelegramMessageDispatchReplayKey(msg: Message): string | nu
4447
return JSON.stringify(["message", String(chatId), messageId]);
4548
}
4649

50+
export function buildTelegramMessageDispatchAccountReplayKey(params: {
51+
accountId: string;
52+
key: string;
53+
}): string {
54+
return JSON.stringify(["account", params.accountId, params.key]);
55+
}
56+
57+
function buildTelegramMessageDispatchStoredReplayKey(params: {
58+
accountId: string;
59+
msg: Message;
60+
}): string | null {
61+
const key = buildTelegramMessageDispatchReplayKey(params.msg);
62+
return key
63+
? buildTelegramMessageDispatchAccountReplayKey({ accountId: params.accountId, key })
64+
: null;
65+
}
66+
4767
export function createTelegramMessageDispatchReplayGuard(
4868
params: {
4969
onDiskError?: (error: unknown) => void;
5070
} = {},
5171
): TelegramMessageDispatchReplayGuard {
5272
return createClaimableDedupe({
5373
ttlMs: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_TTL_MS,
54-
memoryMaxSize: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
55-
pluginId: "telegram",
74+
memoryMaxSize: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MEMORY_MAX_ENTRIES,
75+
pluginId: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_STATE_PLUGIN_ID,
5676
namespacePrefix: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE_PREFIX,
57-
stateMaxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
77+
stateMaxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_STATE_MAX_ENTRIES,
5878
...(params.onDiskError ? { onDiskError: params.onDiskError } : {}),
5979
});
6080
}
@@ -64,14 +84,19 @@ export async function claimTelegramMessageDispatchReplay(params: {
6484
accountId: string;
6585
msg: Message;
6686
}): Promise<TelegramMessageDispatchClaim> {
67-
const key = buildTelegramMessageDispatchReplayKey(params.msg);
87+
const key = buildTelegramMessageDispatchStoredReplayKey({
88+
accountId: params.accountId,
89+
msg: params.msg,
90+
});
6891
if (!key) {
6992
return { kind: "invalid" };
7093
}
7194

7295
let releaseRetries = 0;
7396
while (true) {
74-
const claim = await params.guard.claim(key, { namespace: params.accountId });
97+
const claim = await params.guard.claim(key, {
98+
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
99+
});
75100
if (claim.kind === "claimed") {
76101
return { kind: "claimed", key };
77102
}
@@ -96,21 +121,26 @@ function normalizeReplayKeys(keys?: readonly string[]): string[] {
96121

97122
export async function commitTelegramMessageDispatchReplay(params: {
98123
guard: TelegramMessageDispatchReplayGuard;
99-
accountId: string;
100124
keys?: readonly string[];
101125
}): Promise<void> {
102126
const keys = normalizeReplayKeys(params.keys);
103-
await Promise.all(keys.map((key) => params.guard.commit(key, { namespace: params.accountId })));
127+
await Promise.all(
128+
keys.map((key) =>
129+
params.guard.commit(key, { namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE }),
130+
),
131+
);
104132
}
105133

106134
export function releaseTelegramMessageDispatchReplay(params: {
107135
guard: TelegramMessageDispatchReplayGuard;
108-
accountId: string;
109136
keys?: readonly string[];
110137
error?: unknown;
111138
}): void {
112139
const keys = normalizeReplayKeys(params.keys);
113140
for (const key of keys) {
114-
params.guard.release(key, { namespace: params.accountId, error: params.error });
141+
params.guard.release(key, {
142+
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
143+
error: params.error,
144+
});
115145
}
116146
}

extensions/telegram/src/state-migrations.test.ts

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@ import { afterEach, describe, expect, it } from "vitest";
1414
import { resolveTelegramBotInfoCachePath } from "./bot-info-cache.js";
1515
import { resolveTelegramMessageCachePath } from "./message-cache.js";
1616
import {
17+
buildTelegramMessageDispatchAccountReplayKey,
1718
resolveTelegramMessageDispatchLegacyPath,
18-
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
19+
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
1920
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE_PREFIX,
21+
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_STATE_PLUGIN_ID,
22+
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_STATE_MAX_ENTRIES,
2023
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_TTL_MS,
2124
} from "./message-dispatch-dedupe.js";
2225
import { detectTelegramLegacyStateMigrations } from "./state-migrations.js";
@@ -369,7 +372,7 @@ describe("telegram state migrations", () => {
369372
} as OpenClawConfig;
370373
const plans = await detectTelegramLegacyStateMigrations({ cfg, env });
371374
const dispatchNamespace = resolvePersistentDedupePluginStateNamespace({
372-
namespace: "ops",
375+
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
373376
namespacePrefix: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE_PREFIX,
374377
});
375378

@@ -396,6 +399,7 @@ describe("telegram state migrations", () => {
396399
});
397400
expect(byLabel.get("Telegram message dispatch dedupe")).toMatchObject({
398401
kind: "plugin-state-import",
402+
pluginId: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_STATE_PLUGIN_ID,
399403
sourcePath: dispatchPath,
400404
namespace: dispatchNamespace,
401405
});
@@ -407,7 +411,10 @@ describe("telegram state migrations", () => {
407411
{
408412
key: expect.stringMatching(/^k\.[a-f0-9]{32}$/),
409413
value: {
410-
key: JSON.stringify(["message", "7", 42]),
414+
key: buildTelegramMessageDispatchAccountReplayKey({
415+
accountId: "ops",
416+
key: JSON.stringify(["message", "7", 42]),
417+
}),
411418
seenAt: now,
412419
},
413420
},
@@ -437,7 +444,7 @@ describe("telegram state migrations", () => {
437444
const now = Date.now();
438445
const replayKey = JSON.stringify(["message", "7", 42]);
439446
const dispatchNamespace = resolvePersistentDedupePluginStateNamespace({
440-
namespace: "ops",
447+
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
441448
namespacePrefix: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE_PREFIX,
442449
});
443450
try {
@@ -476,6 +483,7 @@ describe("telegram state migrations", () => {
476483

477484
expect(plan).toMatchObject({
478485
kind: "plugin-state-import",
486+
pluginId: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_STATE_PLUGIN_ID,
479487
namespace: dispatchNamespace,
480488
});
481489
if (!plan || plan.kind !== "plugin-state-import") {
@@ -486,18 +494,24 @@ describe("telegram state migrations", () => {
486494
{
487495
key: expect.stringMatching(/^k\.[a-f0-9]{32}$/),
488496
value: {
489-
key: replayKey,
497+
key: buildTelegramMessageDispatchAccountReplayKey({
498+
accountId: "ops",
499+
key: replayKey,
500+
}),
490501
seenAt: now,
491502
},
492503
},
493504
]);
494505

495-
const targetStore = createPluginStateSyncKeyedStoreForTests("telegram", {
496-
namespace: dispatchNamespace,
497-
maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
498-
defaultTtlMs: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_TTL_MS,
499-
env,
500-
});
506+
const targetStore = createPluginStateSyncKeyedStoreForTests(
507+
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_STATE_PLUGIN_ID,
508+
{
509+
namespace: dispatchNamespace,
510+
maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_STATE_MAX_ENTRIES,
511+
defaultTtlMs: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_TTL_MS,
512+
env,
513+
},
514+
);
501515
for (const entry of entries) {
502516
targetStore.register(
503517
entry.key,

0 commit comments

Comments
 (0)