@@ -28,7 +28,6 @@ import {
2828 resolveChannelStreamingPreviewToolProgress ,
2929 resolveTranscriptBackedChannelFinalText ,
3030} from "openclaw/plugin-sdk/channel-streaming" ;
31- import { isAbortRequestText } from "openclaw/plugin-sdk/command-primitives-runtime" ;
3231import type {
3332 OpenClawConfig ,
3433 ReplyToMode ,
@@ -107,9 +106,23 @@ import {
107106 splitTelegramReasoningText ,
108107} from "./reasoning-lane-coordinator.js" ;
109108import { editMessageTelegram } from "./send.js" ;
109+ import { getTelegramSequentialKey } from "./sequential-key.js" ;
110110import { cacheSticker , describeStickerImage } from "./sticker-cache.js" ;
111+ import {
112+ beginTelegramReplyFence ,
113+ buildTelegramReplyFenceLaneKey ,
114+ endTelegramReplyFence ,
115+ getTelegramReplyFenceSizeForTests ,
116+ isTelegramReplyFenceSuperseded ,
117+ releaseTelegramReplyFenceAbortController ,
118+ resetTelegramReplyFenceForTests ,
119+ resolveTelegramReplyFenceKey ,
120+ shouldSupersedeTelegramReplyFence ,
121+ supersedeTelegramReplyFence ,
122+ } from "./telegram-reply-fence.js" ;
111123
112124export { pruneStickerMediaFromContext } from "./bot-message-dispatch.media.js" ;
125+ export { getTelegramReplyFenceSizeForTests , resetTelegramReplyFenceForTests } ;
113126
114127const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again." ;
115128const silentReplyDispatchLogger = createSubsystemLogger ( "telegram/silent-reply-dispatch" ) ;
@@ -180,140 +193,6 @@ type TelegramReasoningLevel = "off" | "on" | "stream";
180193
181194type TelegramTranscriptMirrorPayload = { text ?: string ; mediaUrls ?: string [ ] } ;
182195
183- type TelegramReplyFenceState = {
184- generation : number ;
185- activeDispatches : number ;
186- abortControllers ?: Set < AbortController > ;
187- } ;
188-
189- type TelegramReplyFenceKey = {
190- activeKey : string ;
191- roomEventKey : string ;
192- } ;
193-
194- // Newer accepted turns and authorized aborts can arrive ahead of older same-session reply work.
195- const telegramReplyFenceByKey = new Map < string , TelegramReplyFenceState > ( ) ;
196-
197- function normalizeTelegramFenceKey ( value : unknown ) : string | undefined {
198- if ( typeof value !== "string" ) {
199- return undefined ;
200- }
201- const trimmed = value . trim ( ) ;
202- return trimmed . length > 0 ? trimmed : undefined ;
203- }
204-
205- function resolveTelegramReplyFenceKey ( params : {
206- ctxPayload : { SessionKey ?: string ; CommandTargetSessionKey ?: string ; InboundEventKind ?: string } ;
207- chatId : number | string ;
208- threadSpec : { id ?: number | string | null ; scope ?: string } ;
209- } ) : TelegramReplyFenceKey {
210- const baseKey =
211- normalizeTelegramFenceKey ( params . ctxPayload . CommandTargetSessionKey ) ??
212- normalizeTelegramFenceKey ( params . ctxPayload . SessionKey ) ??
213- `telegram:${ String ( params . chatId ) } :${ params . threadSpec . scope ?? "default" } :${ params . threadSpec . id ?? "root" } ` ;
214- const roomEventKey = `${ baseKey } :room_event` ;
215- return {
216- activeKey : params . ctxPayload . InboundEventKind === "room_event" ? roomEventKey : baseKey ,
217- roomEventKey,
218- } ;
219- }
220-
221- function abortTelegramReplyFenceControllers ( state : TelegramReplyFenceState ) : void {
222- for ( const controller of state . abortControllers ?? [ ] ) {
223- controller . abort ( ) ;
224- }
225- state . abortControllers ?. clear ( ) ;
226- }
227-
228- function beginTelegramReplyFence ( params : {
229- key : string ;
230- supersede : boolean ;
231- abortController ?: AbortController ;
232- } ) : number {
233- const existing = telegramReplyFenceByKey . get ( params . key ) ;
234- const state : TelegramReplyFenceState = existing ?? {
235- generation : 0 ,
236- activeDispatches : 0 ,
237- } ;
238- if ( params . supersede ) {
239- state . generation += 1 ;
240- abortTelegramReplyFenceControllers ( state ) ;
241- }
242- if ( params . abortController ) {
243- ( state . abortControllers ??= new Set ( ) ) . add ( params . abortController ) ;
244- }
245- state . activeDispatches += 1 ;
246- telegramReplyFenceByKey . set ( params . key , state ) ;
247- return state . generation ;
248- }
249-
250- function supersedeTelegramReplyFence ( key : string ) : void {
251- const state = telegramReplyFenceByKey . get ( key ) ;
252- if ( ! state ) {
253- return ;
254- }
255- state . generation += 1 ;
256- abortTelegramReplyFenceControllers ( state ) ;
257- if ( state . activeDispatches <= 0 && ( state . abortControllers ?. size ?? 0 ) === 0 ) {
258- telegramReplyFenceByKey . delete ( key ) ;
259- } else {
260- telegramReplyFenceByKey . set ( key , state ) ;
261- }
262- }
263-
264- function isTelegramReplyFenceSuperseded ( params : { key : string ; generation : number } ) : boolean {
265- return ( telegramReplyFenceByKey . get ( params . key ) ?. generation ?? 0 ) !== params . generation ;
266- }
267-
268- function endTelegramReplyFence ( key : string , abortController ?: AbortController ) : void {
269- const state = telegramReplyFenceByKey . get ( key ) ;
270- if ( ! state ) {
271- return ;
272- }
273- if ( abortController ) {
274- state . abortControllers ?. delete ( abortController ) ;
275- }
276- state . activeDispatches = Math . max ( 0 , state . activeDispatches - 1 ) ;
277- if ( state . activeDispatches <= 0 && ( state . abortControllers ?. size ?? 0 ) === 0 ) {
278- telegramReplyFenceByKey . delete ( key ) ;
279- }
280- }
281-
282- function releaseTelegramReplyFenceAbortController (
283- key : string ,
284- abortController ?: AbortController ,
285- ) : void {
286- if ( ! abortController ) {
287- return ;
288- }
289- const state = telegramReplyFenceByKey . get ( key ) ;
290- if ( ! state ) {
291- return ;
292- }
293- state . abortControllers ?. delete ( abortController ) ;
294- if ( state . activeDispatches <= 0 && ( state . abortControllers ?. size ?? 0 ) === 0 ) {
295- telegramReplyFenceByKey . delete ( key ) ;
296- }
297- }
298-
299- function shouldSupersedeTelegramReplyFence ( ctxPayload : {
300- Body ?: string ;
301- RawBody ?: string ;
302- CommandBody ?: string ;
303- CommandAuthorized : boolean ;
304- } ) : boolean {
305- const dispatchText = ctxPayload . CommandBody ?? ctxPayload . RawBody ?? ctxPayload . Body ?? "" ;
306- return ! isAbortRequestText ( dispatchText ) || ctxPayload . CommandAuthorized ;
307- }
308-
309- export function getTelegramReplyFenceSizeForTests ( ) : number {
310- return telegramReplyFenceByKey . size ;
311- }
312-
313- export function resetTelegramReplyFenceForTests ( ) : void {
314- telegramReplyFenceByKey . clear ( ) ;
315- }
316-
317196function resolveTelegramReasoningLevel ( params : {
318197 cfg : OpenClawConfig ;
319198 sessionKey ?: string ;
@@ -531,9 +410,17 @@ export const dispatchTelegramMessage = async ({
531410 chatId,
532411 threadSpec,
533412 } ) ;
413+ const replyFenceLaneKey = getTelegramSequentialKey ( {
414+ message : msg ,
415+ ...( context . primaryCtx . me ? { me : context . primaryCtx . me } : { } ) ,
416+ } ) ;
417+ const scopedReplyFenceLaneKey = buildTelegramReplyFenceLaneKey ( {
418+ accountId : route . accountId ,
419+ sequentialKey : replyFenceLaneKey ,
420+ } ) ;
534421 let replyFenceGeneration : number | undefined ;
535- const roomEventAbortController = isRoomEvent ? new AbortController ( ) : undefined ;
536- let roomEventAbortControllerQueued = false ;
422+ const replyAbortController = new AbortController ( ) ;
423+ let replyAbortControllerQueued = false ;
537424 let dispatchWasSuperseded = false ;
538425 const isDispatchSuperseded = ( ) =>
539426 replyFenceGeneration !== undefined &&
@@ -547,7 +434,7 @@ export const dispatchTelegramMessage = async ({
547434 }
548435 endTelegramReplyFence (
549436 replyFenceKey . activeKey ,
550- roomEventAbortControllerQueued ? undefined : roomEventAbortController ,
437+ replyAbortControllerQueued ? undefined : replyAbortController ,
551438 ) ;
552439 replyFenceGeneration = undefined ;
553440 } ;
@@ -940,7 +827,8 @@ export const dispatchTelegramMessage = async ({
940827 replyFenceGeneration = beginTelegramReplyFence ( {
941828 key : replyFenceKey . activeKey ,
942829 supersede : supersedeReplyFence ,
943- abortController : roomEventAbortController ,
830+ abortController : replyAbortController ,
831+ laneKey : scopedReplyFenceLaneKey ,
944832 } ) ;
945833
946834 const implicitQuoteReplyTargetId =
@@ -1567,26 +1455,25 @@ export const dispatchTelegramMessage = async ({
15671455 replyOptions : {
15681456 skillFilter,
15691457 disableBlockStreaming,
1570- abortSignal : roomEventAbortController ? .signal ,
1458+ abortSignal : replyAbortController . signal ,
15711459 sourceReplyDeliveryMode : isRoomEvent ? "message_tool_only" : undefined ,
15721460 queuedDeliveryCorrelations : isRoomEvent
15731461 ? [ { begin : beginDeliveryCorrelation } ]
15741462 : undefined ,
1575- queuedFollowupLifecycle :
1576- isRoomEvent && roomEventAbortController
1577- ? {
1578- onEnqueued : ( ) => {
1579- roomEventAbortControllerQueued = true ;
1580- } ,
1581- onComplete : ( ) => {
1582- roomEventAbortControllerQueued = false ;
1583- releaseTelegramReplyFenceAbortController (
1584- replyFenceKey . activeKey ,
1585- roomEventAbortController ,
1586- ) ;
1587- } ,
1588- }
1589- : undefined ,
1463+ queuedFollowupLifecycle : isRoomEvent
1464+ ? {
1465+ onEnqueued : ( ) => {
1466+ replyAbortControllerQueued = true ;
1467+ } ,
1468+ onComplete : ( ) => {
1469+ replyAbortControllerQueued = false ;
1470+ releaseTelegramReplyFenceAbortController (
1471+ replyFenceKey . activeKey ,
1472+ replyAbortController ,
1473+ ) ;
1474+ } ,
1475+ }
1476+ : undefined ,
15901477 suppressTyping : isRoomEvent ,
15911478 onPartialReply :
15921479 answerLane . stream || reasoningLane . stream
0 commit comments