@@ -3,34 +3,23 @@ import { mkdtempSync, rmSync } from "node:fs";
33import { tmpdir } from "node:os" ;
44import path from "node:path" ;
55import type { Message } from "grammy/types" ;
6- import {
7- createPluginStateKeyedStoreForTests ,
8- createPluginStateSyncKeyedStoreForTests ,
9- resetPluginStateStoreForTests ,
10- } from "openclaw/plugin-sdk/plugin-state-test-runtime" ;
6+ import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime" ;
117import { afterEach , beforeEach , describe , expect , it } from "vitest" ;
128import {
13- TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES ,
14- TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE ,
159 buildTelegramMessageDispatchReplayKey ,
1610 claimTelegramMessageDispatchReplay ,
1711 commitTelegramMessageDispatchReplay ,
1812 createTelegramMessageDispatchReplayGuard ,
1913 releaseTelegramMessageDispatchReplay ,
20- setTelegramMessageDispatchDedupeStoreForTest ,
2114} from "./message-dispatch-dedupe.js" ;
2215
23- type MessageDispatchDedupeStore = NonNullable <
24- Parameters < typeof setTelegramMessageDispatchDedupeStoreForTest > [ 0 ]
25- > ;
26- type SyncMessageDispatchDedupeStore = Extract < MessageDispatchDedupeStore , { entries ( ) : unknown [ ] } > ;
27-
2816const tempDirs : string [ ] = [ ] ;
17+ let previousStateDir : string | undefined ;
2918
30- function createStorePath ( ) : string {
19+ function createStateDir ( ) : string {
3120 const dir = mkdtempSync ( path . join ( tmpdir ( ) , "openclaw-telegram-dispatch-dedupe-" ) ) ;
3221 tempDirs . push ( dir ) ;
33- return path . join ( dir , "sessions.json" ) ;
22+ return dir ;
3423}
3524
3625function message ( params ?: { chatId ?: number ; messageId ?: number } ) : Message {
@@ -41,19 +30,19 @@ function message(params?: { chatId?: number; messageId?: number }): Message {
4130 } as Message ;
4231}
4332
44- beforeEach ( async ( ) => {
33+ beforeEach ( ( ) => {
34+ previousStateDir = process . env . OPENCLAW_STATE_DIR ;
35+ process . env . OPENCLAW_STATE_DIR = createStateDir ( ) ;
4536 resetPluginStateStoreForTests ( { closeDatabase : false } ) ;
46- const store = createPluginStateKeyedStoreForTests ( "telegram" , {
47- namespace : TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE ,
48- maxEntries : TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES ,
49- } ) as NonNullable < Parameters < typeof setTelegramMessageDispatchDedupeStoreForTest > [ 0 ] > ;
50- await store . clear ( ) ;
51- setTelegramMessageDispatchDedupeStoreForTest ( store ) ;
5237} ) ;
5338
5439afterEach ( ( ) => {
55- setTelegramMessageDispatchDedupeStoreForTest ( undefined ) ;
5640 resetPluginStateStoreForTests ( ) ;
41+ if ( previousStateDir === undefined ) {
42+ delete process . env . OPENCLAW_STATE_DIR ;
43+ } else {
44+ process . env . OPENCLAW_STATE_DIR = previousStateDir ;
45+ }
5746 for ( const dir of tempDirs . splice ( 0 ) ) {
5847 rmSync ( dir , { recursive : true , force : true } ) ;
5948 }
@@ -68,8 +57,7 @@ describe("Telegram message dispatch replay guard", () => {
6857 } ) ;
6958
7059 it ( "persists committed dispatches across guard recreation" , async ( ) => {
71- const storePath = createStorePath ( ) ;
72- const writer = createTelegramMessageDispatchReplayGuard ( { storePath } ) ;
60+ const writer = createTelegramMessageDispatchReplayGuard ( ) ;
7361 const first = await claimTelegramMessageDispatchReplay ( {
7462 guard : writer ,
7563 accountId : "default" ,
@@ -89,7 +77,7 @@ describe("Telegram message dispatch replay guard", () => {
8977 keys : [ first . key ] ,
9078 } ) ;
9179
92- const reader = createTelegramMessageDispatchReplayGuard ( { storePath } ) ;
80+ const reader = createTelegramMessageDispatchReplayGuard ( ) ;
9381 await expect (
9482 claimTelegramMessageDispatchReplay ( {
9583 guard : reader ,
@@ -99,9 +87,8 @@ describe("Telegram message dispatch replay guard", () => {
9987 ) . resolves . toEqual ( { kind : "duplicate" } ) ;
10088 } ) ;
10189
102- it ( "preserves concurrent commits that share dedupe buckets" , async ( ) => {
103- const storePath = createStorePath ( ) ;
104- const writer = createTelegramMessageDispatchReplayGuard ( { storePath } ) ;
90+ it ( "preserves concurrent commits" , async ( ) => {
91+ const writer = createTelegramMessageDispatchReplayGuard ( ) ;
10592 const keys = Array . from ( { length : 400 } , ( _ , index ) =>
10693 JSON . stringify ( [ "message" , "1234" , index + 1 ] ) ,
10794 ) ;
@@ -112,151 +99,12 @@ describe("Telegram message dispatch replay guard", () => {
11299 keys,
113100 } ) ;
114101
115- const reader = createTelegramMessageDispatchReplayGuard ( { storePath } ) ;
102+ const reader = createTelegramMessageDispatchReplayGuard ( ) ;
116103 await expect ( reader . warmup ( "default" ) ) . resolves . toBe ( keys . length ) ;
117104 } ) ;
118105
119- it ( "falls back to same-process replay protection when plugin-state is unavailable" , async ( ) => {
120- setTelegramMessageDispatchDedupeStoreForTest ( undefined ) ;
121- const errors : unknown [ ] = [ ] ;
122- const storePath = createStorePath ( ) ;
123- const guard = createTelegramMessageDispatchReplayGuard ( {
124- storePath,
125- onDiskError : ( error ) => errors . push ( error ) ,
126- } ) ;
127- const first = await claimTelegramMessageDispatchReplay ( {
128- guard,
129- accountId : "default" ,
130- msg : message ( ) ,
131- } ) ;
132- if ( first . kind !== "claimed" ) {
133- throw new Error ( "expected initial claim" ) ;
134- }
135-
136- await expect ( guard . commit ( first . key , { namespace : "default" } ) ) . resolves . toBe ( false ) ;
137-
138- await expect (
139- claimTelegramMessageDispatchReplay ( {
140- guard,
141- accountId : "default" ,
142- msg : message ( ) ,
143- } ) ,
144- ) . resolves . toEqual ( { kind : "duplicate" } ) ;
145- await expect ( guard . hasRecent ( first . key , { namespace : "default" } ) ) . resolves . toBe ( true ) ;
146- expect ( errors ) . toEqual ( [ ] ) ;
147- } ) ;
148-
149- it ( "keeps same-process replay protection when plugin-state commit fails" , async ( ) => {
150- const failingStore = createPluginStateKeyedStoreForTests ( "telegram" , {
151- namespace : TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE ,
152- maxEntries : TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES ,
153- } ) as NonNullable < Parameters < typeof setTelegramMessageDispatchDedupeStoreForTest > [ 0 ] > ;
154- setTelegramMessageDispatchDedupeStoreForTest ( {
155- ...failingStore ,
156- async register ( ) {
157- throw new Error ( "state write failed" ) ;
158- } ,
159- } ) ;
160- const storePath = createStorePath ( ) ;
161- const guard = createTelegramMessageDispatchReplayGuard ( { storePath } ) ;
162- const first = await claimTelegramMessageDispatchReplay ( {
163- guard,
164- accountId : "default" ,
165- msg : message ( ) ,
166- } ) ;
167- if ( first . kind !== "claimed" ) {
168- throw new Error ( "expected initial claim" ) ;
169- }
170-
171- await expect ( guard . commit ( first . key , { namespace : "default" } ) ) . resolves . toBe ( false ) ;
172-
173- await expect (
174- claimTelegramMessageDispatchReplay ( {
175- guard,
176- accountId : "default" ,
177- msg : message ( ) ,
178- } ) ,
179- ) . resolves . toEqual ( { kind : "duplicate" } ) ;
180- await expect ( guard . hasRecent ( first . key , { namespace : "default" } ) ) . resolves . toBe ( true ) ;
181- await expect ( guard . warmup ( "default" ) ) . resolves . toBe ( 1 ) ;
182- } ) ;
183-
184- it ( "keeps same-process replay protection when lookup fails after a successful commit" , async ( ) => {
185- const backingStore = createPluginStateSyncKeyedStoreForTests ( "telegram" , {
186- namespace : TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE ,
187- maxEntries : TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES ,
188- } ) as SyncMessageDispatchDedupeStore ;
189- let failLookup = false ;
190- setTelegramMessageDispatchDedupeStoreForTest ( {
191- ...backingStore ,
192- lookup ( key ) {
193- if ( failLookup ) {
194- throw new Error ( "state read failed" ) ;
195- }
196- return backingStore . lookup ( key ) ;
197- } ,
198- } ) ;
199- const storePath = createStorePath ( ) ;
200- const guard = createTelegramMessageDispatchReplayGuard ( { storePath } ) ;
201- const first = await claimTelegramMessageDispatchReplay ( {
202- guard,
203- accountId : "default" ,
204- msg : message ( ) ,
205- } ) ;
206- if ( first . kind !== "claimed" ) {
207- throw new Error ( "expected initial claim" ) ;
208- }
209- await expect ( guard . commit ( first . key , { namespace : "default" } ) ) . resolves . toBe ( true ) ;
210-
211- failLookup = true ;
212-
213- await expect (
214- claimTelegramMessageDispatchReplay ( {
215- guard,
216- accountId : "default" ,
217- msg : message ( ) ,
218- } ) ,
219- ) . resolves . toEqual ( { kind : "duplicate" } ) ;
220- } ) ;
221-
222- it ( "keeps replay histories isolated by session store path" , async ( ) => {
223- const firstStorePath = createStorePath ( ) ;
224- const secondStorePath = createStorePath ( ) ;
225- const firstGuard = createTelegramMessageDispatchReplayGuard ( {
226- storePath : firstStorePath ,
227- } ) ;
228- const first = await claimTelegramMessageDispatchReplay ( {
229- guard : firstGuard ,
230- accountId : "default" ,
231- msg : message ( ) ,
232- } ) ;
233- if ( first . kind !== "claimed" ) {
234- throw new Error ( "expected initial claim" ) ;
235- }
236- await commitTelegramMessageDispatchReplay ( {
237- guard : firstGuard ,
238- accountId : "default" ,
239- keys : [ first . key ] ,
240- } ) ;
241-
242- const secondGuard = createTelegramMessageDispatchReplayGuard ( {
243- storePath : secondStorePath ,
244- } ) ;
245- await expect (
246- claimTelegramMessageDispatchReplay ( {
247- guard : secondGuard ,
248- accountId : "default" ,
249- msg : message ( ) ,
250- } ) ,
251- ) . resolves . toEqual ( {
252- kind : "claimed" ,
253- key : first . key ,
254- } ) ;
255- } ) ;
256-
257106 it ( "keeps accounts isolated and releases retryable pre-dispatch claims" , async ( ) => {
258- const storePath = createStorePath ( ) ;
259- const guard = createTelegramMessageDispatchReplayGuard ( { storePath } ) ;
107+ const guard = createTelegramMessageDispatchReplayGuard ( ) ;
260108 const first = await claimTelegramMessageDispatchReplay ( {
261109 guard,
262110 accountId : "default" ,
@@ -295,8 +143,7 @@ describe("Telegram message dispatch replay guard", () => {
295143 } ) ;
296144
297145 it ( "lets an in-flight duplicate retry after the first claim is released" , async ( ) => {
298- const storePath = createStorePath ( ) ;
299- const guard = createTelegramMessageDispatchReplayGuard ( { storePath } ) ;
146+ const guard = createTelegramMessageDispatchReplayGuard ( ) ;
300147 const first = await claimTelegramMessageDispatchReplay ( {
301148 guard,
302149 accountId : "default" ,
0 commit comments