@@ -2712,6 +2712,88 @@ describe("dispatchTelegramMessage draft streaming", () => {
27122712 expect ( deliveredTexts ) . toContain ( "fresh request answer" ) ;
27132713 } ) ;
27142714
2715+ it ( "keeps newer DM requests from aborting active same-session dispatch" , async ( ) => {
2716+ let firstStarted : ( ( ) => void ) | undefined ;
2717+ const firstStartGate = new Promise < void > ( ( resolve ) => {
2718+ firstStarted = resolve ;
2719+ } ) ;
2720+ let releaseFirst : ( ( ) => void ) | undefined ;
2721+ const firstGate = new Promise < void > ( ( resolve ) => {
2722+ releaseFirst = resolve ;
2723+ } ) ;
2724+ let secondStarted : ( ( ) => void ) | undefined ;
2725+ const secondStartGate = new Promise < void > ( ( resolve ) => {
2726+ secondStarted = resolve ;
2727+ } ) ;
2728+ let firstAbortSignal : AbortSignal | undefined ;
2729+ dispatchReplyWithBufferedBlockDispatcher
2730+ . mockImplementationOnce ( async ( { dispatcherOptions, replyOptions } ) => {
2731+ firstAbortSignal = replyOptions ?. abortSignal ;
2732+ firstStarted ?.( ) ;
2733+ await firstGate ;
2734+ await dispatcherOptions . deliver ( { text : "earlier DM answer" } , { kind : "final" } ) ;
2735+ return {
2736+ queuedFinal : true ,
2737+ counts : { block : 0 , final : 1 , tool : 0 } ,
2738+ } ;
2739+ } )
2740+ . mockImplementationOnce ( async ( { dispatcherOptions } ) => {
2741+ secondStarted ?.( ) ;
2742+ await dispatcherOptions . deliver ( { text : "fresh DM answer" } , { kind : "final" } ) ;
2743+ return {
2744+ queuedFinal : true ,
2745+ counts : { block : 0 , final : 1 , tool : 0 } ,
2746+ } ;
2747+ } ) ;
2748+ deliverReplies . mockResolvedValue ( { delivered : true } ) ;
2749+
2750+ const createDirectContext = ( messageId : number , body : string ) =>
2751+ createContext ( {
2752+ ctxPayload : {
2753+ SessionKey : "agent:main:main" ,
2754+ ChatType : "direct" ,
2755+ MessageSid : String ( messageId ) ,
2756+ RawBody : body ,
2757+ BodyForAgent : body ,
2758+ CommandBody : body ,
2759+ CommandAuthorized : true ,
2760+ } as unknown as TelegramMessageContext [ "ctxPayload" ] ,
2761+ msg : {
2762+ chat : { id : 123 , type : "private" } ,
2763+ message_id : messageId ,
2764+ } as unknown as TelegramMessageContext [ "msg" ] ,
2765+ chatId : 123 ,
2766+ isGroup : false ,
2767+ historyKey : "telegram:123" ,
2768+ historyLimit : 10 ,
2769+ groupHistories : new Map ( ) ,
2770+ threadSpec : { id : undefined , scope : "none" } ,
2771+ } ) ;
2772+
2773+ const firstPromise = dispatchWithContext ( {
2774+ context : createDirectContext ( 99 , "first request" ) ,
2775+ streamMode : "off" ,
2776+ } ) ;
2777+ await firstStartGate ;
2778+ const secondPromise = dispatchWithContext ( {
2779+ context : createDirectContext ( 100 , "second request" ) ,
2780+ streamMode : "off" ,
2781+ } ) ;
2782+ await secondStartGate ;
2783+
2784+ expect ( firstAbortSignal ?. aborted ) . toBe ( false ) ;
2785+ releaseFirst ?.( ) ;
2786+ await Promise . all ( [ firstPromise , secondPromise ] ) ;
2787+
2788+ const deliveredTexts = deliverReplies . mock . calls . flatMap ( ( call ) =>
2789+ ( ( call [ 0 ] as { replies ?: Array < { text ?: string } > } ) . replies ?? [ ] ) . map (
2790+ ( reply ) => reply . text ,
2791+ ) ,
2792+ ) ;
2793+ expect ( deliveredTexts ) . toContain ( "fresh DM answer" ) ;
2794+ expect ( deliveredTexts ) . toContain ( "earlier DM answer" ) ;
2795+ } ) ;
2796+
27152797 it ( "keeps /btw side questions from aborting an active same-session dispatch" , async ( ) => {
27162798 const historyKey = "telegram:group:-100123" ;
27172799 const groupHistories = new Map ( [ [ historyKey , [ ] ] ] ) ;
0 commit comments