@@ -18,8 +18,14 @@ import {
1818 shouldSkipMessageByAbortCutoff ,
1919 tryFastAbortFromMessage ,
2020} from "./abort.js" ;
21+ import { testing as acpResetTargetTesting } from "./acp-reset-target.js" ;
2122import { enqueueFollowupRun , getFollowupQueueDepth , type FollowupRun } from "./queue.js" ;
2223import { testing as queueCleanupTesting } from "./queue/cleanup.js" ;
24+ import {
25+ createReplyOperation ,
26+ replyRunRegistry ,
27+ testing as replyRunRegistryTesting ,
28+ } from "./reply-run-registry.js" ;
2329import { buildTestCtx } from "./test-ctx.js" ;
2430
2531vi . mock ( "../../agents/pi-embedded.js" , ( ) => ( {
@@ -112,7 +118,8 @@ describe("abort detection", () => {
112118
113119 async function runStopCommand ( params : {
114120 cfg : OpenClawConfig ;
115- sessionKey : string ;
121+ sessionKey ?: string ;
122+ parentSessionKey ?: string ;
116123 from : string ;
117124 to : string ;
118125 senderId ?: string ;
@@ -126,11 +133,12 @@ describe("abort detection", () => {
126133 CommandBody : "/stop" ,
127134 RawBody : "/stop" ,
128135 CommandAuthorized : true ,
129- SessionKey : params . sessionKey ,
130136 Provider : "telegram" ,
131137 Surface : "telegram" ,
132138 From : params . from ,
133139 To : params . to ,
140+ ...( params . sessionKey ? { SessionKey : params . sessionKey } : { } ) ,
141+ ...( params . parentSessionKey ? { ParentSessionKey : params . parentSessionKey } : { } ) ,
134142 ...( params . senderId ? { SenderId : params . senderId } : { } ) ,
135143 ...( params . commandSource ? { CommandSource : params . commandSource } : { } ) ,
136144 ...( params . targetSessionKey ? { CommandTargetSessionKey : params . targetSessionKey } : { } ) ,
@@ -202,7 +210,9 @@ describe("abort detection", () => {
202210 afterEach ( ( ) => {
203211 resetAbortMemoryForTest ( ) ;
204212 abortTesting . resetDepsForTests ( ) ;
213+ acpResetTargetTesting . setDepsForTest ( ) ;
205214 queueCleanupTesting . resetDepsForTests ( ) ;
215+ replyRunRegistryTesting . resetReplyRunRegistry ( ) ;
206216 commandQueueMocks . clearCommandLane . mockClear ( ) . mockReturnValue ( 1 ) ;
207217 acpManagerMocks . resolveSession . mockReset ( ) . mockReturnValue ( { kind : "none" } ) ;
208218 acpManagerMocks . cancelSession . mockReset ( ) . mockResolvedValue ( undefined ) ;
@@ -512,6 +522,296 @@ describe("abort detection", () => {
512522 expectSessionLaneCleared ( sessionKey ) ;
513523 } ) ;
514524
525+ it ( "fast-abort of an ACP target also aborts the bound source dispatch lane" , async ( ) => {
526+ const sourceSessionKey = "agent:main:discord:channel:C1" ;
527+ const acpSessionKey = "agent:codex:acp:bound-session" ;
528+ const { root, cfg } = await createAbortConfig ( {
529+ sessionIdsByKey : {
530+ [ sourceSessionKey ] : "source-store-session" ,
531+ [ acpSessionKey ] : "acp-store-session" ,
532+ } ,
533+ } ) ;
534+ const sourceOperation = createReplyOperation ( {
535+ sessionKey : sourceSessionKey ,
536+ sessionId : "source-active-session" ,
537+ resetTriggered : false ,
538+ } ) ;
539+ enqueueQueuedFollowupRun ( {
540+ root,
541+ cfg,
542+ sessionId : "source-active-session" ,
543+ sessionKey : sourceSessionKey ,
544+ } ) ;
545+ enqueueQueuedFollowupRun ( {
546+ root,
547+ cfg,
548+ sessionId : "acp-store-session" ,
549+ sessionKey : acpSessionKey ,
550+ } ) ;
551+ acpResetTargetTesting . setDepsForTest ( {
552+ getSessionBindingService : ( ) =>
553+ ( {
554+ resolveByConversation : ( ) => ( {
555+ targetKind : "session" ,
556+ targetSessionKey : acpSessionKey ,
557+ } ) ,
558+ } ) as never ,
559+ listAcpBindings : ( ) => [ ] ,
560+ resolveConfiguredBindingRecord : ( ) => null ,
561+ } ) ;
562+ acpManagerMocks . resolveSession . mockReturnValue ( {
563+ kind : "ready" ,
564+ sessionKey : acpSessionKey ,
565+ meta : { } as never ,
566+ } ) ;
567+
568+ const result = await runStopCommand ( {
569+ cfg,
570+ sessionKey : sourceSessionKey ,
571+ from : "discord:C1" ,
572+ to : "discord:C1" ,
573+ targetSessionKey : acpSessionKey ,
574+ commandSource : "native" ,
575+ } ) ;
576+
577+ expect ( result . handled ) . toBe ( true ) ;
578+ expect ( sourceOperation . result ) . toEqual ( { kind : "aborted" , code : "aborted_by_user" } ) ;
579+ expect ( replyRunRegistry . isActive ( sourceSessionKey ) ) . toBe ( false ) ;
580+ expect ( getFollowupQueueDepth ( sourceSessionKey ) ) . toBe ( 0 ) ;
581+ expect ( getFollowupQueueDepth ( acpSessionKey ) ) . toBe ( 0 ) ;
582+ expectSessionLaneCleared ( sourceSessionKey ) ;
583+ expectSessionLaneCleared ( acpSessionKey ) ;
584+ expect ( acpManagerMocks . cancelSession ) . toHaveBeenCalledWith ( {
585+ cfg,
586+ sessionKey : acpSessionKey ,
587+ reason : "fast-abort" ,
588+ } ) ;
589+ } ) ;
590+
591+ it ( "fast-abort of an ACP target aborts the source stored session when no source reply operation is registered" , async ( ) => {
592+ const sourceSessionKey = "agent:main:discord:channel:C2" ;
593+ const acpSessionKey = "agent:codex:acp:bound-session-stored-source" ;
594+ const { root, cfg } = await createAbortConfig ( {
595+ sessionIdsByKey : {
596+ [ sourceSessionKey ] : "source-store-session" ,
597+ [ acpSessionKey ] : "acp-store-session" ,
598+ } ,
599+ } ) ;
600+ enqueueQueuedFollowupRun ( {
601+ root,
602+ cfg,
603+ sessionId : "source-store-session" ,
604+ sessionKey : sourceSessionKey ,
605+ } ) ;
606+ enqueueQueuedFollowupRun ( {
607+ root,
608+ cfg,
609+ sessionId : "acp-store-session" ,
610+ sessionKey : acpSessionKey ,
611+ } ) ;
612+ acpResetTargetTesting . setDepsForTest ( {
613+ getSessionBindingService : ( ) =>
614+ ( {
615+ resolveByConversation : ( ) => ( {
616+ targetKind : "session" ,
617+ targetSessionKey : acpSessionKey ,
618+ } ) ,
619+ } ) as never ,
620+ listAcpBindings : ( ) => [ ] ,
621+ resolveConfiguredBindingRecord : ( ) => null ,
622+ } ) ;
623+ acpManagerMocks . resolveSession . mockReturnValue ( {
624+ kind : "ready" ,
625+ sessionKey : acpSessionKey ,
626+ meta : { } as never ,
627+ } ) ;
628+
629+ const result = await runStopCommand ( {
630+ cfg,
631+ sessionKey : sourceSessionKey ,
632+ from : "discord:C2" ,
633+ to : "discord:C2" ,
634+ targetSessionKey : acpSessionKey ,
635+ commandSource : "native" ,
636+ } ) ;
637+
638+ expect ( result . handled ) . toBe ( true ) ;
639+ expect ( runtimeAbortMocks . abortEmbeddedPiRun ) . toHaveBeenCalledWith ( "source-store-session" ) ;
640+ expect ( getFollowupQueueDepth ( sourceSessionKey ) ) . toBe ( 0 ) ;
641+ expect ( getFollowupQueueDepth ( acpSessionKey ) ) . toBe ( 0 ) ;
642+ expectSessionLaneCleared ( sourceSessionKey ) ;
643+ expectSessionLaneCleared ( acpSessionKey ) ;
644+ } ) ;
645+
646+ it ( "does not abort the caller source lane for an unbound explicit ACP target" , async ( ) => {
647+ const sourceSessionKey = "agent:main:discord:channel:C3" ;
648+ const acpSessionKey = "agent:codex:acp:unbound-explicit-target" ;
649+ const { cfg } = await createAbortConfig ( {
650+ sessionIdsByKey : {
651+ [ sourceSessionKey ] : "source-store-session" ,
652+ [ acpSessionKey ] : "acp-store-session" ,
653+ } ,
654+ } ) ;
655+ const sourceOperation = createReplyOperation ( {
656+ sessionKey : sourceSessionKey ,
657+ sessionId : "source-active-session" ,
658+ resetTriggered : false ,
659+ } ) ;
660+ acpManagerMocks . resolveSession . mockReturnValue ( {
661+ kind : "ready" ,
662+ sessionKey : acpSessionKey ,
663+ meta : { } as never ,
664+ } ) ;
665+
666+ const result = await runStopCommand ( {
667+ cfg,
668+ sessionKey : sourceSessionKey ,
669+ from : "discord:C3" ,
670+ to : "discord:C3" ,
671+ targetSessionKey : acpSessionKey ,
672+ commandSource : "native" ,
673+ } ) ;
674+
675+ expect ( result . handled ) . toBe ( true ) ;
676+ expect ( sourceOperation . result ) . toBeNull ( ) ;
677+ expect ( replyRunRegistry . isActive ( sourceSessionKey ) ) . toBe ( true ) ;
678+ expect ( acpManagerMocks . cancelSession ) . toHaveBeenCalledWith ( {
679+ cfg,
680+ sessionKey : acpSessionKey ,
681+ reason : "fast-abort" ,
682+ } ) ;
683+ sourceOperation . complete ( ) ;
684+ } ) ;
685+
686+ it ( "uses ParentSessionKey as the source lane for a bound explicit ACP target" , async ( ) => {
687+ const sourceSessionKey = "agent:main:discord:channel:C4" ;
688+ const acpSessionKey = "agent:codex:acp:bound-parent-source" ;
689+ const { cfg } = await createAbortConfig ( {
690+ sessionIdsByKey : {
691+ [ sourceSessionKey ] : "source-store-session" ,
692+ [ acpSessionKey ] : "acp-store-session" ,
693+ } ,
694+ } ) ;
695+ const sourceOperation = createReplyOperation ( {
696+ sessionKey : sourceSessionKey ,
697+ sessionId : "source-active-session" ,
698+ resetTriggered : false ,
699+ } ) ;
700+ acpResetTargetTesting . setDepsForTest ( {
701+ getSessionBindingService : ( ) =>
702+ ( {
703+ resolveByConversation : ( ) => ( {
704+ targetKind : "session" ,
705+ targetSessionKey : acpSessionKey ,
706+ } ) ,
707+ } ) as never ,
708+ listAcpBindings : ( ) => [ ] ,
709+ resolveConfiguredBindingRecord : ( ) => null ,
710+ } ) ;
711+ acpManagerMocks . resolveSession . mockReturnValue ( {
712+ kind : "ready" ,
713+ sessionKey : acpSessionKey ,
714+ meta : { } as never ,
715+ } ) ;
716+
717+ const result = await runStopCommand ( {
718+ cfg,
719+ parentSessionKey : sourceSessionKey ,
720+ from : "discord:C4" ,
721+ to : "discord:C4" ,
722+ targetSessionKey : acpSessionKey ,
723+ commandSource : "native" ,
724+ } ) ;
725+
726+ expect ( result . handled ) . toBe ( true ) ;
727+ expect ( sourceOperation . result ) . toEqual ( { kind : "aborted" , code : "aborted_by_user" } ) ;
728+ expect ( replyRunRegistry . isActive ( sourceSessionKey ) ) . toBe ( false ) ;
729+ } ) ;
730+
731+ it ( "fast-abort from an ACP-bound source conversation aborts source and bound ACP lanes" , async ( ) => {
732+ const sourceSessionKey = "agent:main:telegram:direct:source-1" ;
733+ const acpSessionKey = "agent:codex:acp:bound-source-stop" ;
734+ const { root, storePath, cfg } = await createAbortConfig ( {
735+ sessionIdsByKey : {
736+ [ sourceSessionKey ] : "source-store-session" ,
737+ [ acpSessionKey ] : "acp-store-session" ,
738+ } ,
739+ } ) ;
740+ const sourceOperation = createReplyOperation ( {
741+ sessionKey : sourceSessionKey ,
742+ sessionId : "source-active-session" ,
743+ resetTriggered : false ,
744+ } ) ;
745+ const acpOperation = createReplyOperation ( {
746+ sessionKey : acpSessionKey ,
747+ sessionId : "acp-active-session" ,
748+ resetTriggered : false ,
749+ } ) ;
750+ enqueueQueuedFollowupRun ( {
751+ root,
752+ cfg,
753+ sessionId : "source-active-session" ,
754+ sessionKey : sourceSessionKey ,
755+ } ) ;
756+ enqueueQueuedFollowupRun ( {
757+ root,
758+ cfg,
759+ sessionId : "acp-active-session" ,
760+ sessionKey : acpSessionKey ,
761+ } ) ;
762+ acpResetTargetTesting . setDepsForTest ( {
763+ getSessionBindingService : ( ) =>
764+ ( {
765+ resolveByConversation : ( ) => ( {
766+ targetKind : "session" ,
767+ targetSessionKey : acpSessionKey ,
768+ } ) ,
769+ } ) as never ,
770+ listAcpBindings : ( ) => [ ] ,
771+ resolveConfiguredBindingRecord : ( ) => null ,
772+ } ) ;
773+ acpManagerMocks . resolveSession . mockReturnValue ( {
774+ kind : "ready" ,
775+ sessionKey : acpSessionKey ,
776+ meta : { } as never ,
777+ } ) ;
778+
779+ const result = await runStopCommand ( {
780+ cfg,
781+ sessionKey : sourceSessionKey ,
782+ from : "telegram:source-1" ,
783+ to : "telegram:source-1" ,
784+ messageSid : "77" ,
785+ timestamp : 1234567890000 ,
786+ } ) ;
787+
788+ expect ( result . handled ) . toBe ( true ) ;
789+ expect ( sourceOperation . result ) . toEqual ( { kind : "aborted" , code : "aborted_by_user" } ) ;
790+ expect ( acpOperation . result ) . toEqual ( { kind : "aborted" , code : "aborted_by_user" } ) ;
791+ expect ( replyRunRegistry . isActive ( sourceSessionKey ) ) . toBe ( false ) ;
792+ expect ( replyRunRegistry . isActive ( acpSessionKey ) ) . toBe ( false ) ;
793+ expect ( getFollowupQueueDepth ( sourceSessionKey ) ) . toBe ( 0 ) ;
794+ expect ( getFollowupQueueDepth ( acpSessionKey ) ) . toBe ( 0 ) ;
795+ expectSessionLaneCleared ( sourceSessionKey ) ;
796+ expectSessionLaneCleared ( acpSessionKey ) ;
797+ expect ( acpManagerMocks . cancelSession ) . toHaveBeenCalledWith ( {
798+ cfg,
799+ sessionKey : acpSessionKey ,
800+ reason : "fast-abort" ,
801+ } ) ;
802+ const store = JSON . parse ( await fs . readFile ( storePath , "utf8" ) ) as Record <
803+ string ,
804+ {
805+ abortCutoffMessageSid ?: string ;
806+ abortCutoffTimestamp ?: number ;
807+ }
808+ > ;
809+ expect ( store [ sourceSessionKey ] ?. abortCutoffMessageSid ) . toBe ( "77" ) ;
810+ expect ( store [ sourceSessionKey ] ?. abortCutoffTimestamp ) . toBe ( 1234567890000 ) ;
811+ expect ( store [ acpSessionKey ] ?. abortCutoffMessageSid ) . toBeUndefined ( ) ;
812+ expect ( store [ acpSessionKey ] ?. abortCutoffTimestamp ) . toBeUndefined ( ) ;
813+ } ) ;
814+
515815 it ( "persists abort cutoff metadata on /stop when command and target session match" , async ( ) => {
516816 const sessionKey = "telegram:123" ;
517817 const sessionId = "session-123" ;
@@ -540,6 +840,34 @@ describe("abort detection", () => {
540840 expect ( entry . abortCutoffTimestamp ) . toBe ( 1234567890000 ) ;
541841 } ) ;
542842
843+ it ( "persists abort cutoff metadata when only ParentSessionKey identifies the command session" , async ( ) => {
844+ const sessionKey = "telegram:parent-only" ;
845+ const sessionId = "session-parent-only" ;
846+ const { storePath, cfg } = await createAbortConfig ( {
847+ sessionIdsByKey : { [ sessionKey ] : sessionId } ,
848+ } ) ;
849+
850+ const result = await runStopCommand ( {
851+ cfg,
852+ parentSessionKey : sessionKey ,
853+ from : "telegram:parent-only" ,
854+ to : "telegram:parent-only" ,
855+ messageSid : "56" ,
856+ timestamp : 1234567890001 ,
857+ } ) ;
858+
859+ expect ( result . handled ) . toBe ( true ) ;
860+ const store = JSON . parse ( await fs . readFile ( storePath , "utf8" ) ) as Record < string , unknown > ;
861+ const entry = store [ sessionKey ] as {
862+ abortedLastRun ?: boolean ;
863+ abortCutoffMessageSid ?: string ;
864+ abortCutoffTimestamp ?: number ;
865+ } ;
866+ expect ( entry . abortedLastRun ) . toBe ( true ) ;
867+ expect ( entry . abortCutoffMessageSid ) . toBe ( "56" ) ;
868+ expect ( entry . abortCutoffTimestamp ) . toBe ( 1234567890001 ) ;
869+ } ) ;
870+
543871 it ( "does not persist cutoff metadata when native /stop targets a different session" , async ( ) => {
544872 const slashSessionKey = "telegram:slash:123" ;
545873 const targetSessionKey = "agent:main:telegram:group:123" ;
0 commit comments