Skip to content

Commit 0ab1449

Browse files
authored
Fix Discord session recovery abort ownership (#85100)
* fix auto-reply abort ownership * add changelog for #85100
1 parent c5e8bd0 commit 0ab1449

9 files changed

Lines changed: 1827 additions & 277 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Docs: https://docs.openclaw.ai
4141
- doctor: constrain legacy plugin cleanup paths [AI]. (#84801) Thanks @pgondhi987.
4242
- Update/doctor: prune stale local bundled plugin install records that point at old compiled bundled output so current bundled plugin schemas win after upgrade. (#84863) Thanks @fuller-stack-dev.
4343
- Providers/Ollama: preserve native Ollama tool-call IDs across assistant replay so Gemini over Ollama Cloud can keep its hidden function-call thought-signature handle.
44+
- Discord: keep session recovery and `/stop` abort ownership on the source dispatch lane while bound ACP turns continue routing to their target session, so stalled pre-run work and late replies are cleared instead of leaking after stop. Fixes #84477. (#85100) Thanks @joshavant.
4445
- PDF tool: time out idle remote PDF body reads after 120 seconds so stalled remote documents return an error instead of wedging the session. Fixes #68649. (#84768) Thanks @luoyanglang.
4546
- Diagnostics/OpenTelemetry plugin: suppress handled OTLP exporter promise rejections so collector shutdowns no longer crash the Gateway. (#81085) Thanks @luoyanglang.
4647
- Media/audio: skip empty structured sherpa-onnx transcripts instead of treating the raw JSON payload as spoken text. (#84667) Thanks @TurboTheTurtle.

src/auto-reply/reply/abort.test.ts

Lines changed: 330 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,14 @@ import {
1818
shouldSkipMessageByAbortCutoff,
1919
tryFastAbortFromMessage,
2020
} from "./abort.js";
21+
import { testing as acpResetTargetTesting } from "./acp-reset-target.js";
2122
import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./queue.js";
2223
import { testing as queueCleanupTesting } from "./queue/cleanup.js";
24+
import {
25+
createReplyOperation,
26+
replyRunRegistry,
27+
testing as replyRunRegistryTesting,
28+
} from "./reply-run-registry.js";
2329
import { buildTestCtx } from "./test-ctx.js";
2430

2531
vi.mock("../../agents/pi-embedded.js", () => ({
@@ -112,7 +118,8 @@ describe("abort detection", () => {
112118

113119
async function runStopCommand(params: {
114120
cfg: OpenClawConfig;
115-
sessionKey: string;
121+
sessionKey?: string;
122+
parentSessionKey?: string;
116123
from: string;
117124
to: string;
118125
senderId?: string;
@@ -126,11 +133,12 @@ describe("abort detection", () => {
126133
CommandBody: "/stop",
127134
RawBody: "/stop",
128135
CommandAuthorized: true,
129-
SessionKey: params.sessionKey,
130136
Provider: "telegram",
131137
Surface: "telegram",
132138
From: params.from,
133139
To: params.to,
140+
...(params.sessionKey ? { SessionKey: params.sessionKey } : {}),
141+
...(params.parentSessionKey ? { ParentSessionKey: params.parentSessionKey } : {}),
134142
...(params.senderId ? { SenderId: params.senderId } : {}),
135143
...(params.commandSource ? { CommandSource: params.commandSource } : {}),
136144
...(params.targetSessionKey ? { CommandTargetSessionKey: params.targetSessionKey } : {}),
@@ -202,7 +210,9 @@ describe("abort detection", () => {
202210
afterEach(() => {
203211
resetAbortMemoryForTest();
204212
abortTesting.resetDepsForTests();
213+
acpResetTargetTesting.setDepsForTest();
205214
queueCleanupTesting.resetDepsForTests();
215+
replyRunRegistryTesting.resetReplyRunRegistry();
206216
commandQueueMocks.clearCommandLane.mockClear().mockReturnValue(1);
207217
acpManagerMocks.resolveSession.mockReset().mockReturnValue({ kind: "none" });
208218
acpManagerMocks.cancelSession.mockReset().mockResolvedValue(undefined);
@@ -512,6 +522,296 @@ describe("abort detection", () => {
512522
expectSessionLaneCleared(sessionKey);
513523
});
514524

525+
it("fast-abort of an ACP target also aborts the bound source dispatch lane", async () => {
526+
const sourceSessionKey = "agent:main:discord:channel:C1";
527+
const acpSessionKey = "agent:codex:acp:bound-session";
528+
const { root, cfg } = await createAbortConfig({
529+
sessionIdsByKey: {
530+
[sourceSessionKey]: "source-store-session",
531+
[acpSessionKey]: "acp-store-session",
532+
},
533+
});
534+
const sourceOperation = createReplyOperation({
535+
sessionKey: sourceSessionKey,
536+
sessionId: "source-active-session",
537+
resetTriggered: false,
538+
});
539+
enqueueQueuedFollowupRun({
540+
root,
541+
cfg,
542+
sessionId: "source-active-session",
543+
sessionKey: sourceSessionKey,
544+
});
545+
enqueueQueuedFollowupRun({
546+
root,
547+
cfg,
548+
sessionId: "acp-store-session",
549+
sessionKey: acpSessionKey,
550+
});
551+
acpResetTargetTesting.setDepsForTest({
552+
getSessionBindingService: () =>
553+
({
554+
resolveByConversation: () => ({
555+
targetKind: "session",
556+
targetSessionKey: acpSessionKey,
557+
}),
558+
}) as never,
559+
listAcpBindings: () => [],
560+
resolveConfiguredBindingRecord: () => null,
561+
});
562+
acpManagerMocks.resolveSession.mockReturnValue({
563+
kind: "ready",
564+
sessionKey: acpSessionKey,
565+
meta: {} as never,
566+
});
567+
568+
const result = await runStopCommand({
569+
cfg,
570+
sessionKey: sourceSessionKey,
571+
from: "discord:C1",
572+
to: "discord:C1",
573+
targetSessionKey: acpSessionKey,
574+
commandSource: "native",
575+
});
576+
577+
expect(result.handled).toBe(true);
578+
expect(sourceOperation.result).toEqual({ kind: "aborted", code: "aborted_by_user" });
579+
expect(replyRunRegistry.isActive(sourceSessionKey)).toBe(false);
580+
expect(getFollowupQueueDepth(sourceSessionKey)).toBe(0);
581+
expect(getFollowupQueueDepth(acpSessionKey)).toBe(0);
582+
expectSessionLaneCleared(sourceSessionKey);
583+
expectSessionLaneCleared(acpSessionKey);
584+
expect(acpManagerMocks.cancelSession).toHaveBeenCalledWith({
585+
cfg,
586+
sessionKey: acpSessionKey,
587+
reason: "fast-abort",
588+
});
589+
});
590+
591+
it("fast-abort of an ACP target aborts the source stored session when no source reply operation is registered", async () => {
592+
const sourceSessionKey = "agent:main:discord:channel:C2";
593+
const acpSessionKey = "agent:codex:acp:bound-session-stored-source";
594+
const { root, cfg } = await createAbortConfig({
595+
sessionIdsByKey: {
596+
[sourceSessionKey]: "source-store-session",
597+
[acpSessionKey]: "acp-store-session",
598+
},
599+
});
600+
enqueueQueuedFollowupRun({
601+
root,
602+
cfg,
603+
sessionId: "source-store-session",
604+
sessionKey: sourceSessionKey,
605+
});
606+
enqueueQueuedFollowupRun({
607+
root,
608+
cfg,
609+
sessionId: "acp-store-session",
610+
sessionKey: acpSessionKey,
611+
});
612+
acpResetTargetTesting.setDepsForTest({
613+
getSessionBindingService: () =>
614+
({
615+
resolveByConversation: () => ({
616+
targetKind: "session",
617+
targetSessionKey: acpSessionKey,
618+
}),
619+
}) as never,
620+
listAcpBindings: () => [],
621+
resolveConfiguredBindingRecord: () => null,
622+
});
623+
acpManagerMocks.resolveSession.mockReturnValue({
624+
kind: "ready",
625+
sessionKey: acpSessionKey,
626+
meta: {} as never,
627+
});
628+
629+
const result = await runStopCommand({
630+
cfg,
631+
sessionKey: sourceSessionKey,
632+
from: "discord:C2",
633+
to: "discord:C2",
634+
targetSessionKey: acpSessionKey,
635+
commandSource: "native",
636+
});
637+
638+
expect(result.handled).toBe(true);
639+
expect(runtimeAbortMocks.abortEmbeddedPiRun).toHaveBeenCalledWith("source-store-session");
640+
expect(getFollowupQueueDepth(sourceSessionKey)).toBe(0);
641+
expect(getFollowupQueueDepth(acpSessionKey)).toBe(0);
642+
expectSessionLaneCleared(sourceSessionKey);
643+
expectSessionLaneCleared(acpSessionKey);
644+
});
645+
646+
it("does not abort the caller source lane for an unbound explicit ACP target", async () => {
647+
const sourceSessionKey = "agent:main:discord:channel:C3";
648+
const acpSessionKey = "agent:codex:acp:unbound-explicit-target";
649+
const { cfg } = await createAbortConfig({
650+
sessionIdsByKey: {
651+
[sourceSessionKey]: "source-store-session",
652+
[acpSessionKey]: "acp-store-session",
653+
},
654+
});
655+
const sourceOperation = createReplyOperation({
656+
sessionKey: sourceSessionKey,
657+
sessionId: "source-active-session",
658+
resetTriggered: false,
659+
});
660+
acpManagerMocks.resolveSession.mockReturnValue({
661+
kind: "ready",
662+
sessionKey: acpSessionKey,
663+
meta: {} as never,
664+
});
665+
666+
const result = await runStopCommand({
667+
cfg,
668+
sessionKey: sourceSessionKey,
669+
from: "discord:C3",
670+
to: "discord:C3",
671+
targetSessionKey: acpSessionKey,
672+
commandSource: "native",
673+
});
674+
675+
expect(result.handled).toBe(true);
676+
expect(sourceOperation.result).toBeNull();
677+
expect(replyRunRegistry.isActive(sourceSessionKey)).toBe(true);
678+
expect(acpManagerMocks.cancelSession).toHaveBeenCalledWith({
679+
cfg,
680+
sessionKey: acpSessionKey,
681+
reason: "fast-abort",
682+
});
683+
sourceOperation.complete();
684+
});
685+
686+
it("uses ParentSessionKey as the source lane for a bound explicit ACP target", async () => {
687+
const sourceSessionKey = "agent:main:discord:channel:C4";
688+
const acpSessionKey = "agent:codex:acp:bound-parent-source";
689+
const { cfg } = await createAbortConfig({
690+
sessionIdsByKey: {
691+
[sourceSessionKey]: "source-store-session",
692+
[acpSessionKey]: "acp-store-session",
693+
},
694+
});
695+
const sourceOperation = createReplyOperation({
696+
sessionKey: sourceSessionKey,
697+
sessionId: "source-active-session",
698+
resetTriggered: false,
699+
});
700+
acpResetTargetTesting.setDepsForTest({
701+
getSessionBindingService: () =>
702+
({
703+
resolveByConversation: () => ({
704+
targetKind: "session",
705+
targetSessionKey: acpSessionKey,
706+
}),
707+
}) as never,
708+
listAcpBindings: () => [],
709+
resolveConfiguredBindingRecord: () => null,
710+
});
711+
acpManagerMocks.resolveSession.mockReturnValue({
712+
kind: "ready",
713+
sessionKey: acpSessionKey,
714+
meta: {} as never,
715+
});
716+
717+
const result = await runStopCommand({
718+
cfg,
719+
parentSessionKey: sourceSessionKey,
720+
from: "discord:C4",
721+
to: "discord:C4",
722+
targetSessionKey: acpSessionKey,
723+
commandSource: "native",
724+
});
725+
726+
expect(result.handled).toBe(true);
727+
expect(sourceOperation.result).toEqual({ kind: "aborted", code: "aborted_by_user" });
728+
expect(replyRunRegistry.isActive(sourceSessionKey)).toBe(false);
729+
});
730+
731+
it("fast-abort from an ACP-bound source conversation aborts source and bound ACP lanes", async () => {
732+
const sourceSessionKey = "agent:main:telegram:direct:source-1";
733+
const acpSessionKey = "agent:codex:acp:bound-source-stop";
734+
const { root, storePath, cfg } = await createAbortConfig({
735+
sessionIdsByKey: {
736+
[sourceSessionKey]: "source-store-session",
737+
[acpSessionKey]: "acp-store-session",
738+
},
739+
});
740+
const sourceOperation = createReplyOperation({
741+
sessionKey: sourceSessionKey,
742+
sessionId: "source-active-session",
743+
resetTriggered: false,
744+
});
745+
const acpOperation = createReplyOperation({
746+
sessionKey: acpSessionKey,
747+
sessionId: "acp-active-session",
748+
resetTriggered: false,
749+
});
750+
enqueueQueuedFollowupRun({
751+
root,
752+
cfg,
753+
sessionId: "source-active-session",
754+
sessionKey: sourceSessionKey,
755+
});
756+
enqueueQueuedFollowupRun({
757+
root,
758+
cfg,
759+
sessionId: "acp-active-session",
760+
sessionKey: acpSessionKey,
761+
});
762+
acpResetTargetTesting.setDepsForTest({
763+
getSessionBindingService: () =>
764+
({
765+
resolveByConversation: () => ({
766+
targetKind: "session",
767+
targetSessionKey: acpSessionKey,
768+
}),
769+
}) as never,
770+
listAcpBindings: () => [],
771+
resolveConfiguredBindingRecord: () => null,
772+
});
773+
acpManagerMocks.resolveSession.mockReturnValue({
774+
kind: "ready",
775+
sessionKey: acpSessionKey,
776+
meta: {} as never,
777+
});
778+
779+
const result = await runStopCommand({
780+
cfg,
781+
sessionKey: sourceSessionKey,
782+
from: "telegram:source-1",
783+
to: "telegram:source-1",
784+
messageSid: "77",
785+
timestamp: 1234567890000,
786+
});
787+
788+
expect(result.handled).toBe(true);
789+
expect(sourceOperation.result).toEqual({ kind: "aborted", code: "aborted_by_user" });
790+
expect(acpOperation.result).toEqual({ kind: "aborted", code: "aborted_by_user" });
791+
expect(replyRunRegistry.isActive(sourceSessionKey)).toBe(false);
792+
expect(replyRunRegistry.isActive(acpSessionKey)).toBe(false);
793+
expect(getFollowupQueueDepth(sourceSessionKey)).toBe(0);
794+
expect(getFollowupQueueDepth(acpSessionKey)).toBe(0);
795+
expectSessionLaneCleared(sourceSessionKey);
796+
expectSessionLaneCleared(acpSessionKey);
797+
expect(acpManagerMocks.cancelSession).toHaveBeenCalledWith({
798+
cfg,
799+
sessionKey: acpSessionKey,
800+
reason: "fast-abort",
801+
});
802+
const store = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<
803+
string,
804+
{
805+
abortCutoffMessageSid?: string;
806+
abortCutoffTimestamp?: number;
807+
}
808+
>;
809+
expect(store[sourceSessionKey]?.abortCutoffMessageSid).toBe("77");
810+
expect(store[sourceSessionKey]?.abortCutoffTimestamp).toBe(1234567890000);
811+
expect(store[acpSessionKey]?.abortCutoffMessageSid).toBeUndefined();
812+
expect(store[acpSessionKey]?.abortCutoffTimestamp).toBeUndefined();
813+
});
814+
515815
it("persists abort cutoff metadata on /stop when command and target session match", async () => {
516816
const sessionKey = "telegram:123";
517817
const sessionId = "session-123";
@@ -540,6 +840,34 @@ describe("abort detection", () => {
540840
expect(entry.abortCutoffTimestamp).toBe(1234567890000);
541841
});
542842

843+
it("persists abort cutoff metadata when only ParentSessionKey identifies the command session", async () => {
844+
const sessionKey = "telegram:parent-only";
845+
const sessionId = "session-parent-only";
846+
const { storePath, cfg } = await createAbortConfig({
847+
sessionIdsByKey: { [sessionKey]: sessionId },
848+
});
849+
850+
const result = await runStopCommand({
851+
cfg,
852+
parentSessionKey: sessionKey,
853+
from: "telegram:parent-only",
854+
to: "telegram:parent-only",
855+
messageSid: "56",
856+
timestamp: 1234567890001,
857+
});
858+
859+
expect(result.handled).toBe(true);
860+
const store = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<string, unknown>;
861+
const entry = store[sessionKey] as {
862+
abortedLastRun?: boolean;
863+
abortCutoffMessageSid?: string;
864+
abortCutoffTimestamp?: number;
865+
};
866+
expect(entry.abortedLastRun).toBe(true);
867+
expect(entry.abortCutoffMessageSid).toBe("56");
868+
expect(entry.abortCutoffTimestamp).toBe(1234567890001);
869+
});
870+
543871
it("does not persist cutoff metadata when native /stop targets a different session", async () => {
544872
const slashSessionKey = "telegram:slash:123";
545873
const targetSessionKey = "agent:main:telegram:group:123";

0 commit comments

Comments
 (0)