@@ -422,6 +422,178 @@ describe("dispatchTelegramMessage draft streaming", () => {
422422 } ,
423423 ) ;
424424
425+ it ( "materializes boundary preview and keeps it when no matching final arrives" , async ( ) => {
426+ const answerDraftStream = createDraftStream ( 999 ) ;
427+ answerDraftStream . materialize . mockResolvedValue ( 4321 ) ;
428+ const reasoningDraftStream = createDraftStream ( ) ;
429+ createTelegramDraftStream
430+ . mockImplementationOnce ( ( ) => answerDraftStream )
431+ . mockImplementationOnce ( ( ) => reasoningDraftStream ) ;
432+ dispatchReplyWithBufferedBlockDispatcher . mockImplementation ( async ( { replyOptions } ) => {
433+ await replyOptions ?. onPartialReply ?.( { text : "Before tool boundary" } ) ;
434+ await replyOptions ?. onAssistantMessageStart ?.( ) ;
435+ return { queuedFinal : false } ;
436+ } ) ;
437+
438+ const bot = createBot ( ) ;
439+ await dispatchWithContext ( { context : createContext ( ) , streamMode : "partial" , bot } ) ;
440+
441+ expect ( answerDraftStream . materialize ) . toHaveBeenCalledTimes ( 1 ) ;
442+ expect ( answerDraftStream . forceNewMessage ) . toHaveBeenCalledTimes ( 1 ) ;
443+ expect ( answerDraftStream . clear ) . toHaveBeenCalledTimes ( 1 ) ;
444+ const deleteMessageCalls = (
445+ bot . api as unknown as { deleteMessage : { mock : { calls : unknown [ ] [ ] } } }
446+ ) . deleteMessage . mock . calls ;
447+ expect ( deleteMessageCalls ) . not . toContainEqual ( [ 123 , 4321 ] ) ;
448+ } ) ;
449+
450+ it ( "waits for queued boundary rotation before final lane delivery" , async ( ) => {
451+ const answerDraftStream = createSequencedDraftStream ( 1001 ) ;
452+ let resolveMaterialize : ( ( value : number | undefined ) => void ) | undefined ;
453+ const materializePromise = new Promise < number | undefined > ( ( resolve ) => {
454+ resolveMaterialize = resolve ;
455+ } ) ;
456+ answerDraftStream . materialize . mockImplementation ( ( ) => materializePromise ) ;
457+ const reasoningDraftStream = createDraftStream ( ) ;
458+ createTelegramDraftStream
459+ . mockImplementationOnce ( ( ) => answerDraftStream )
460+ . mockImplementationOnce ( ( ) => reasoningDraftStream ) ;
461+ dispatchReplyWithBufferedBlockDispatcher . mockImplementation (
462+ async ( { dispatcherOptions, replyOptions } ) => {
463+ await replyOptions ?. onPartialReply ?.( { text : "Message A partial" } ) ;
464+ await dispatcherOptions . deliver ( { text : "Message A final" } , { kind : "final" } ) ;
465+ const startPromise = replyOptions ?. onAssistantMessageStart ?.( ) ;
466+ const finalPromise = dispatcherOptions . deliver (
467+ { text : "Message B final" } ,
468+ { kind : "final" } ,
469+ ) ;
470+ resolveMaterialize ?.( 1001 ) ;
471+ await startPromise ;
472+ await finalPromise ;
473+ return { queuedFinal : true } ;
474+ } ,
475+ ) ;
476+ deliverReplies . mockResolvedValue ( { delivered : true } ) ;
477+ editMessageTelegram . mockResolvedValue ( { ok : true , chatId : "123" , messageId : "1001" } ) ;
478+
479+ await dispatchWithContext ( { context : createContext ( ) , streamMode : "partial" } ) ;
480+
481+ expect ( answerDraftStream . forceNewMessage ) . toHaveBeenCalledTimes ( 1 ) ;
482+ expect ( editMessageTelegram ) . toHaveBeenCalledTimes ( 2 ) ;
483+ expect ( editMessageTelegram ) . toHaveBeenNthCalledWith (
484+ 2 ,
485+ 123 ,
486+ 1002 ,
487+ "Message B final" ,
488+ expect . any ( Object ) ,
489+ ) ;
490+ } ) ;
491+
492+ it ( "clears active preview even when an unrelated boundary archive exists" , async ( ) => {
493+ const answerDraftStream = createDraftStream ( 999 ) ;
494+ answerDraftStream . materialize . mockResolvedValue ( 4321 ) ;
495+ answerDraftStream . forceNewMessage . mockImplementation ( ( ) => {
496+ answerDraftStream . setMessageId ( 5555 ) ;
497+ } ) ;
498+ const reasoningDraftStream = createDraftStream ( ) ;
499+ createTelegramDraftStream
500+ . mockImplementationOnce ( ( ) => answerDraftStream )
501+ . mockImplementationOnce ( ( ) => reasoningDraftStream ) ;
502+ dispatchReplyWithBufferedBlockDispatcher . mockImplementation ( async ( { replyOptions } ) => {
503+ await replyOptions ?. onPartialReply ?.( { text : "Before tool boundary" } ) ;
504+ await replyOptions ?. onAssistantMessageStart ?.( ) ;
505+ await replyOptions ?. onPartialReply ?.( { text : "Unfinalized next preview" } ) ;
506+ return { queuedFinal : false } ;
507+ } ) ;
508+
509+ const bot = createBot ( ) ;
510+ await dispatchWithContext ( { context : createContext ( ) , streamMode : "partial" , bot } ) ;
511+
512+ expect ( answerDraftStream . clear ) . toHaveBeenCalledTimes ( 1 ) ;
513+ const deleteMessageCalls = (
514+ bot . api as unknown as { deleteMessage : { mock : { calls : unknown [ ] [ ] } } }
515+ ) . deleteMessage . mock . calls ;
516+ expect ( deleteMessageCalls ) . not . toContainEqual ( [ 123 , 4321 ] ) ;
517+ } ) ;
518+
519+ it ( "queues late partials behind async boundary materialization" , async ( ) => {
520+ const answerDraftStream = createDraftStream ( 999 ) ;
521+ let resolveMaterialize : ( ( value : number | undefined ) => void ) | undefined ;
522+ const materializePromise = new Promise < number | undefined > ( ( resolve ) => {
523+ resolveMaterialize = resolve ;
524+ } ) ;
525+ answerDraftStream . materialize . mockImplementation ( ( ) => materializePromise ) ;
526+ const reasoningDraftStream = createDraftStream ( ) ;
527+ createTelegramDraftStream
528+ . mockImplementationOnce ( ( ) => answerDraftStream )
529+ . mockImplementationOnce ( ( ) => reasoningDraftStream ) ;
530+ dispatchReplyWithBufferedBlockDispatcher . mockImplementation ( async ( { replyOptions } ) => {
531+ await replyOptions ?. onPartialReply ?.( { text : "Message A partial" } ) ;
532+
533+ // Simulate provider fire-and-forget ordering: boundary callback starts
534+ // and a new partial arrives before boundary materialization resolves.
535+ const startPromise = replyOptions ?. onAssistantMessageStart ?.( ) ;
536+ const nextPartialPromise = replyOptions ?. onPartialReply ?.( { text : "Message B early" } ) ;
537+
538+ expect ( answerDraftStream . update ) . toHaveBeenCalledTimes ( 1 ) ;
539+ resolveMaterialize ?.( 4321 ) ;
540+
541+ await startPromise ;
542+ await nextPartialPromise ;
543+ return { queuedFinal : false } ;
544+ } ) ;
545+
546+ await dispatchWithContext ( { context : createContext ( ) , streamMode : "partial" } ) ;
547+
548+ expect ( answerDraftStream . materialize ) . toHaveBeenCalledTimes ( 1 ) ;
549+ expect ( answerDraftStream . forceNewMessage ) . toHaveBeenCalledTimes ( 1 ) ;
550+ expect ( answerDraftStream . update ) . toHaveBeenCalledTimes ( 2 ) ;
551+ expect ( answerDraftStream . update ) . toHaveBeenNthCalledWith ( 2 , "Message B early" ) ;
552+ const boundaryRotationOrder = answerDraftStream . forceNewMessage . mock . invocationCallOrder [ 0 ] ;
553+ const secondUpdateOrder = answerDraftStream . update . mock . invocationCallOrder [ 1 ] ;
554+ expect ( boundaryRotationOrder ) . toBeLessThan ( secondUpdateOrder ) ;
555+ } ) ;
556+
557+ it ( "keeps final-only preview lane finalized until a real boundary rotation happens" , async ( ) => {
558+ const answerDraftStream = createSequencedDraftStream ( 1001 ) ;
559+ const reasoningDraftStream = createDraftStream ( ) ;
560+ createTelegramDraftStream
561+ . mockImplementationOnce ( ( ) => answerDraftStream )
562+ . mockImplementationOnce ( ( ) => reasoningDraftStream ) ;
563+ dispatchReplyWithBufferedBlockDispatcher . mockImplementation (
564+ async ( { dispatcherOptions, replyOptions } ) => {
565+ // Final-only first response (no streamed partials).
566+ await dispatcherOptions . deliver ( { text : "Message A final" } , { kind : "final" } ) ;
567+ // Simulate provider ordering bug: first chunk arrives before message-start callback.
568+ await replyOptions ?. onPartialReply ?.( { text : "Message B early" } ) ;
569+ await replyOptions ?. onAssistantMessageStart ?.( ) ;
570+ await replyOptions ?. onPartialReply ?.( { text : "Message B partial" } ) ;
571+ await dispatcherOptions . deliver ( { text : "Message B final" } , { kind : "final" } ) ;
572+ return { queuedFinal : true } ;
573+ } ,
574+ ) ;
575+ deliverReplies . mockResolvedValue ( { delivered : true } ) ;
576+ editMessageTelegram . mockResolvedValue ( { ok : true , chatId : "123" , messageId : "1001" } ) ;
577+
578+ await dispatchWithContext ( { context : createContext ( ) , streamMode : "partial" } ) ;
579+
580+ expect ( answerDraftStream . forceNewMessage ) . toHaveBeenCalledTimes ( 1 ) ;
581+ expect ( editMessageTelegram ) . toHaveBeenNthCalledWith (
582+ 1 ,
583+ 123 ,
584+ 1001 ,
585+ "Message A final" ,
586+ expect . any ( Object ) ,
587+ ) ;
588+ expect ( editMessageTelegram ) . toHaveBeenNthCalledWith (
589+ 2 ,
590+ 123 ,
591+ 1002 ,
592+ "Message B final" ,
593+ expect . any ( Object ) ,
594+ ) ;
595+ } ) ;
596+
425597 it ( "does not force new message on first assistant message start" , async ( ) => {
426598 const draftStream = createDraftStream ( 999 ) ;
427599 createTelegramDraftStream . mockReturnValue ( draftStream ) ;
@@ -829,6 +1001,32 @@ describe("dispatchTelegramMessage draft streaming", () => {
8291001 } ,
8301002 ) ;
8311003
1004+ it ( "queues reasoning-end split decisions behind queued reasoning deltas" , async ( ) => {
1005+ const { reasoningDraftStream } = setupDraftStreams ( {
1006+ answerMessageId : 999 ,
1007+ reasoningMessageId : 111 ,
1008+ } ) ;
1009+ dispatchReplyWithBufferedBlockDispatcher . mockImplementation (
1010+ async ( { dispatcherOptions, replyOptions } ) => {
1011+ // Simulate fire-and-forget upstream ordering: reasoning_end arrives
1012+ // before the queued reasoning delta callback has finished.
1013+ const firstReasoningPromise = replyOptions ?. onReasoningStream ?.( {
1014+ text : "Reasoning:\n_first block_" ,
1015+ } ) ;
1016+ await replyOptions ?. onReasoningEnd ?.( ) ;
1017+ await firstReasoningPromise ;
1018+ await replyOptions ?. onReasoningStream ?.( { text : "Reasoning:\n_second block_" } ) ;
1019+ await dispatcherOptions . deliver ( { text : "Done" } , { kind : "final" } ) ;
1020+ return { queuedFinal : true } ;
1021+ } ,
1022+ ) ;
1023+ deliverReplies . mockResolvedValue ( { delivered : true } ) ;
1024+
1025+ await dispatchWithContext ( { context : createReasoningStreamContext ( ) , streamMode : "partial" } ) ;
1026+
1027+ expect ( reasoningDraftStream . forceNewMessage ) . toHaveBeenCalledTimes ( 1 ) ;
1028+ } ) ;
1029+
8321030 it ( "cleans superseded reasoning previews after lane rotation" , async ( ) => {
8331031 let reasoningDraftParams :
8341032 | {
0 commit comments