1+ import fs from "node:fs" ;
12import os from "node:os" ;
23import path from "node:path" ;
34import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts" ;
@@ -15,10 +16,127 @@ import {
1516
1617const originalTelegramActionRuntime = { ...telegramActionRuntime } ;
1718const reactMessageTelegram = vi . fn ( async ( ) => ( { ok : true } ) ) ;
18- const sendMessageTelegram = vi . fn ( async ( ) => ( {
19- messageId : "789" ,
20- chatId : "123" ,
21- } ) ) ;
19+ const sendMessageTelegram = vi . fn (
20+ async ( _to : string , _text : string , _opts ?: Record < string , unknown > ) => ( {
21+ messageId : "789" ,
22+ chatId : "123" ,
23+ } ) ,
24+ ) ;
25+ const sendDurableMessageBatch = vi . fn (
26+ async ( params : {
27+ cfg : OpenClawConfig ;
28+ to : string ;
29+ accountId ?: string ;
30+ payloads : Array < {
31+ text ?: string ;
32+ mediaUrl ?: string ;
33+ mediaUrls ?: string [ ] ;
34+ audioAsVoice ?: boolean ;
35+ delivery ?: {
36+ pin ?: true | { enabled ?: boolean ; notify ?: boolean ; required ?: boolean } ;
37+ } ;
38+ channelData ?: { telegram ?: { buttons ?: unknown ; quoteText ?: string } } ;
39+ } > ;
40+ replyToId ?: string ;
41+ threadId ?: string | number ;
42+ forceDocument ?: boolean ;
43+ silent ?: boolean ;
44+ gatewayClientScopes ?: readonly string [ ] ;
45+ session ?: {
46+ key ?: string ;
47+ agentId ?: string ;
48+ requesterAccountId ?: string ;
49+ } ;
50+ mediaAccess ?: {
51+ localRoots ?: readonly string [ ] ;
52+ readFile ?: ( filePath : string ) => Promise < Buffer > ;
53+ } ;
54+ } ) => {
55+ const payload = params . payloads [ 0 ] ?? { } ;
56+ const mediaUrls = payload . mediaUrls ?. length
57+ ? payload . mediaUrls
58+ : payload . mediaUrl
59+ ? [ payload . mediaUrl ]
60+ : [ ] ;
61+ const telegramData = payload . channelData ?. telegram ;
62+ const cfg = params . cfg as {
63+ channels ?: {
64+ telegram ?: {
65+ botToken ?: string ;
66+ accounts ?: Record < string , { botToken ?: string } > ;
67+ } ;
68+ } ;
69+ } ;
70+ const token =
71+ ( params . accountId
72+ ? cfg . channels ?. telegram ?. accounts ?. [ params . accountId ] ?. botToken
73+ : undefined ) ??
74+ cfg . channels ?. telegram ?. botToken ??
75+ process . env . TELEGRAM_BOT_TOKEN ;
76+ const baseOptions = {
77+ cfg : params . cfg ,
78+ token,
79+ accountId : params . accountId ,
80+ gatewayClientScopes : params . gatewayClientScopes ,
81+ replyToMessageId :
82+ params . replyToId == null ? undefined : Number . parseInt ( params . replyToId , 10 ) ,
83+ messageThreadId :
84+ params . threadId == null ? undefined : Number . parseInt ( String ( params . threadId ) , 10 ) ,
85+ quoteText : telegramData ?. quoteText ,
86+ asVoice : payload . audioAsVoice ,
87+ silent : params . silent ,
88+ forceDocument : params . forceDocument ,
89+ mediaLocalRoots : params . mediaAccess ?. localRoots ,
90+ mediaReadFile : params . mediaAccess ?. readFile ,
91+ } ;
92+ const calls = mediaUrls . length > 0 ? mediaUrls : [ undefined ] ;
93+ let last = { messageId : "789" , chatId : "123" } ;
94+ for ( const [ index , mediaUrl ] of calls . entries ( ) ) {
95+ last = await sendMessageTelegram ( params . to , index === 0 ? ( payload . text ?? "" ) : "" , {
96+ ...baseOptions ,
97+ ...( mediaUrl ? { mediaUrl } : { } ) ,
98+ ...( index === 0 && telegramData ?. buttons ? { buttons : telegramData . buttons } : { } ) ,
99+ } ) ;
100+ }
101+ const pin =
102+ payload . delivery ?. pin === true
103+ ? { enabled : true }
104+ : payload . delivery ?. pin && payload . delivery . pin . enabled
105+ ? payload . delivery . pin
106+ : undefined ;
107+ if ( pin && last . messageId ) {
108+ try {
109+ await pinMessageTelegram ( params . to , last . messageId , {
110+ cfg : params . cfg ,
111+ accountId : params . accountId ,
112+ notify : pin . notify ,
113+ verbose : false ,
114+ gatewayClientScopes : params . gatewayClientScopes ,
115+ } ) ;
116+ } catch ( err ) {
117+ if ( pin . required ) {
118+ throw err ;
119+ }
120+ }
121+ }
122+ return {
123+ status : "sent" ,
124+ results : [ { channel : "telegram" , messageId : last . messageId , chatId : last . chatId } ] ,
125+ receipt : {
126+ primaryPlatformMessageId : last . messageId ,
127+ platformMessageIds : [ last . messageId ] ,
128+ parts : [
129+ {
130+ platformMessageId : last . messageId ,
131+ kind : mediaUrls . length > 0 ? "media" : "text" ,
132+ index : 0 ,
133+ } ,
134+ ] ,
135+ sentAt : Date . now ( ) ,
136+ } ,
137+ } as const ;
138+ } ,
139+ ) ;
22140const sendPollTelegram = vi . fn ( async ( ) => ( {
23141 messageId : "790" ,
24142 chatId : "123" ,
@@ -40,11 +158,13 @@ const editForumTopicTelegram = vi.fn(async () => ({
40158 messageThreadId : 42 ,
41159 name : "Renamed" ,
42160} ) ) ;
43- const pinMessageTelegram = vi . fn ( async ( ) => ( {
44- ok : true ,
45- messageId : "789" ,
46- chatId : "123" ,
47- } ) ) ;
161+ const pinMessageTelegram = vi . fn (
162+ async ( _to : string , _messageId : string , _opts ?: Record < string , unknown > ) => ( {
163+ ok : true ,
164+ messageId : "789" ,
165+ chatId : "123" ,
166+ } ) ,
167+ ) ;
48168const createForumTopicTelegram = vi . fn ( async ( ) => ( {
49169 topicId : 99 ,
50170 name : "Topic" ,
@@ -109,6 +229,20 @@ function resultDetails(result: Awaited<ReturnType<typeof handleTelegramAction>>)
109229 return requireRecord ( result . details , "Telegram action details" ) ;
110230}
111231
232+ function readDurableQueueEntries ( stateDir : string ) : Record < string , unknown > [ ] {
233+ const queueDir = path . join ( stateDir , "delivery-queue" ) ;
234+ if ( ! fs . existsSync ( queueDir ) ) {
235+ return [ ] ;
236+ }
237+ return fs
238+ . readdirSync ( queueDir )
239+ . filter ( ( name ) => name . endsWith ( ".json" ) )
240+ . map ( ( name ) => JSON . parse ( fs . readFileSync ( path . join ( queueDir , name ) , "utf-8" ) ) ) as Record <
241+ string ,
242+ unknown
243+ > [ ] ;
244+ }
245+
112246describe ( "handleTelegramAction" , ( ) => {
113247 const defaultReactionAction = {
114248 action : "react" ,
@@ -175,11 +309,12 @@ describe("handleTelegramAction", () => {
175309 }
176310
177311 beforeEach ( ( ) => {
178- envSnapshot = captureEnv ( [ "TELEGRAM_BOT_TOKEN" ] ) ;
312+ envSnapshot = captureEnv ( [ "OPENCLAW_STATE_DIR" , " TELEGRAM_BOT_TOKEN"] ) ;
179313 resetTopicNameCacheForTest ( ) ;
180314 installTopicNameStoreForTest ( ) ;
181315 Object . assign ( telegramActionRuntime , originalTelegramActionRuntime , {
182316 reactMessageTelegram,
317+ sendDurableMessageBatch,
183318 sendMessageTelegram,
184319 sendPollTelegram,
185320 sendStickerTelegram,
@@ -190,6 +325,7 @@ describe("handleTelegramAction", () => {
190325 createForumTopicTelegram,
191326 } ) ;
192327 reactMessageTelegram . mockClear ( ) ;
328+ sendDurableMessageBatch . mockClear ( ) ;
193329 sendMessageTelegram . mockClear ( ) ;
194330 sendPollTelegram . mockClear ( ) ;
195331 sendStickerTelegram . mockClear ( ) ;
@@ -417,14 +553,26 @@ describe("handleTelegramAction", () => {
417553 content : "Hello, Telegram!" ,
418554 } ,
419555 telegramConfig ( ) ,
420- { gatewayClientScopes : [ "operator.write" ] } ,
556+ {
557+ gatewayClientScopes : [ "operator.write" ] ,
558+ sessionKey : "agent:main:telegram:direct:123" ,
559+ } ,
421560 ) ;
422561 const call = mockCall ( sendMessageTelegram , 0 , "text message" ) ;
423562 expect ( call [ 0 ] ) . toBe ( "@testchannel" ) ;
424563 expect ( call [ 1 ] ) . toBe ( "Hello, Telegram!" ) ;
425564 const options = requireRecord ( call [ 2 ] , "text message options" ) ;
426565 expect ( options . token ) . toBe ( "tok" ) ;
427566 expect ( options . mediaUrl ) . toBeUndefined ( ) ;
567+ const durableCall = mockCall ( sendDurableMessageBatch , 0 , "durable text message" ) ;
568+ expect ( requireRecord ( durableCall [ 0 ] , "durable text message params" ) ) . toMatchObject ( {
569+ channel : "telegram" ,
570+ to : "@testchannel" ,
571+ durability : "required" ,
572+ gatewayClientScopes : [ "operator.write" ] ,
573+ session : { key : "agent:main:telegram:direct:123" , agentId : "main" } ,
574+ payloads : [ { text : "Hello, Telegram!" } ] ,
575+ } ) ;
428576 expect ( result . content ) . toStrictEqual ( [
429577 {
430578 type : "text" ,
@@ -438,6 +586,135 @@ describe("handleTelegramAction", () => {
438586 } ) ;
439587 } ) ;
440588
589+ it ( "persists sendMessage action deliveries before Telegram platform send" , async ( ) => {
590+ const stateDir = fs . mkdtempSync ( path . join ( os . tmpdir ( ) , "openclaw-telegram-action-durable-" ) ) ;
591+ const { createOutboundTestPlugin, createTestRegistry, setActivePluginRegistry } =
592+ await import ( "openclaw/plugin-sdk/plugin-test-runtime" ) ;
593+ const sendText = vi
594+ . fn ( )
595+ . mockImplementationOnce ( async ( ) => {
596+ const entries = readDurableQueueEntries ( stateDir ) ;
597+ expect ( entries ) . toHaveLength ( 1 ) ;
598+ expect ( entries [ 0 ] ) . toMatchObject ( {
599+ channel : "telegram" ,
600+ to : "12345" ,
601+ payloads : [
602+ {
603+ text : "times out after queue write" ,
604+ delivery : { pin : { enabled : true , required : true } } ,
605+ } ,
606+ ] ,
607+ session : { key : "agent:main:telegram:direct:12345" , agentId : "main" } ,
608+ gatewayClientScopes : [ "operator.write" ] ,
609+ retryCount : 0 ,
610+ } ) ;
611+ throw new Error ( "telegram timeout" ) ;
612+ } )
613+ . mockImplementationOnce ( async ( ) => {
614+ const entries = readDurableQueueEntries ( stateDir ) ;
615+ const liveEntry = entries . find ( ( entry ) =>
616+ JSON . stringify ( entry . payloads ) . includes ( "delivers after queue write" ) ,
617+ ) ;
618+ expect ( liveEntry ) . toMatchObject ( {
619+ channel : "telegram" ,
620+ to : "12345" ,
621+ payloads : [ { text : "delivers after queue write" } ] ,
622+ retryCount : 0 ,
623+ } ) ;
624+ return { channel : "telegram" , messageId : "tg-ok" } ;
625+ } ) ;
626+
627+ process . env . OPENCLAW_STATE_DIR = stateDir ;
628+ telegramActionRuntime . sendDurableMessageBatch =
629+ originalTelegramActionRuntime . sendDurableMessageBatch ;
630+ setActivePluginRegistry (
631+ createTestRegistry ( [
632+ {
633+ pluginId : "telegram" ,
634+ source : "test" ,
635+ plugin : createOutboundTestPlugin ( {
636+ id : "telegram" ,
637+ outbound : {
638+ deliveryMode : "direct" ,
639+ deliveryCapabilities : {
640+ durableFinal : {
641+ text : true ,
642+ media : true ,
643+ payload : true ,
644+ silent : true ,
645+ replyTo : true ,
646+ thread : true ,
647+ messageSendingHooks : true ,
648+ batch : true ,
649+ } ,
650+ } ,
651+ sendText,
652+ } ,
653+ } ) ,
654+ } ,
655+ ] ) ,
656+ ) ;
657+
658+ try {
659+ await expect (
660+ handleTelegramAction (
661+ {
662+ action : "sendMessage" ,
663+ to : "12345" ,
664+ content : "times out after queue write" ,
665+ delivery : { pin : { enabled : true , required : true } } ,
666+ } ,
667+ telegramConfig ( ) ,
668+ {
669+ gatewayClientScopes : [ "operator.write" ] ,
670+ sessionKey : "agent:main:telegram:direct:12345" ,
671+ } ,
672+ ) ,
673+ ) . rejects . toThrow ( "telegram timeout" ) ;
674+
675+ const retryableEntries = readDurableQueueEntries ( stateDir ) ;
676+ expect ( retryableEntries ) . toHaveLength ( 1 ) ;
677+ expect ( retryableEntries [ 0 ] ) . toMatchObject ( {
678+ payloads : [
679+ {
680+ text : "times out after queue write" ,
681+ delivery : { pin : { enabled : true , required : true } } ,
682+ } ,
683+ ] ,
684+ retryCount : 1 ,
685+ } ) ;
686+ expect ( String ( retryableEntries [ 0 ] ?. lastError ) ) . toContain ( "telegram timeout" ) ;
687+
688+ const result = await handleTelegramAction (
689+ {
690+ action : "sendMessage" ,
691+ to : "12345" ,
692+ content : "delivers after queue write" ,
693+ } ,
694+ telegramConfig ( ) ,
695+ { sessionKey : "agent:main:telegram:direct:12345" } ,
696+ ) ;
697+
698+ expect ( result . details ) . toMatchObject ( {
699+ ok : true ,
700+ messageId : "tg-ok" ,
701+ } ) ;
702+ expect ( readDurableQueueEntries ( stateDir ) ) . toHaveLength ( 1 ) ;
703+ expect ( readDurableQueueEntries ( stateDir ) [ 0 ] ) . toMatchObject ( {
704+ payloads : [
705+ {
706+ text : "times out after queue write" ,
707+ delivery : { pin : { enabled : true , required : true } } ,
708+ } ,
709+ ] ,
710+ retryCount : 1 ,
711+ } ) ;
712+ } finally {
713+ setActivePluginRegistry ( createTestRegistry ( [ ] ) ) ;
714+ fs . rmSync ( stateDir , { recursive : true , force : true } ) ;
715+ }
716+ } ) ;
717+
441718 it ( "normalizes legacy group targets for sendMessage actions" , async ( ) => {
442719 await handleTelegramAction (
443720 {
@@ -1092,6 +1369,10 @@ describe("handleTelegramAction", () => {
10921369 expect ( options . accountId ) . toBeUndefined ( ) ;
10931370 expect ( options . verbose ) . toBe ( false ) ;
10941371 expect ( options . gatewayClientScopes ) . toEqual ( [ "operator.write" ] ) ;
1372+ const durableCall = mockCall ( sendDurableMessageBatch , 0 , "durable delivery pin" ) ;
1373+ expect ( requireRecord ( durableCall [ 0 ] , "durable delivery pin params" ) ) . toMatchObject ( {
1374+ payloads : [ { delivery : { pin : { enabled : true } } } ] ,
1375+ } ) ;
10951376 } ) ;
10961377
10971378 it ( "passes delivery pin notify requests for action sends" , async ( ) => {
@@ -1109,6 +1390,10 @@ describe("handleTelegramAction", () => {
11091390 expect ( call [ 0 ] ) . toBe ( "123456" ) ;
11101391 expect ( call [ 1 ] ) . toBe ( "789" ) ;
11111392 expect ( requireRecord ( call [ 2 ] , "delivery pin notify options" ) . notify ) . toBe ( true ) ;
1393+ const durableCall = mockCall ( sendDurableMessageBatch , 0 , "durable delivery pin notify" ) ;
1394+ expect ( requireRecord ( durableCall [ 0 ] , "durable delivery pin notify params" ) ) . toMatchObject ( {
1395+ payloads : [ { delivery : { pin : { enabled : true , notify : true } } } ] ,
1396+ } ) ;
11121397 } ) ;
11131398
11141399 it ( "fails required action-send pins when pinning fails" , async ( ) => {
0 commit comments