Skip to content

Commit 695478d

Browse files
committed
fix(reply): keep queued user turns after supersede
1 parent d761b98 commit 695478d

4 files changed

Lines changed: 86 additions & 4 deletions

File tree

src/auto-reply/reply/followup-runner.test.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,35 @@ describe("createFollowupRunner runtime config", () => {
942942
expect(call.abortSignal).toBe(abortController.signal);
943943
});
944944

945+
it("does not inherit source abort signals for queued user followups", async () => {
946+
const sourceAbortController = new AbortController();
947+
sourceAbortController.abort();
948+
runEmbeddedPiAgentMock.mockResolvedValueOnce({
949+
payloads: [],
950+
meta: {},
951+
});
952+
const runner = createFollowupRunner({
953+
opts: { abortSignal: sourceAbortController.signal },
954+
typing: createMockTypingController(),
955+
typingMode: "instant",
956+
defaultModel: "openai/gpt-5.4",
957+
});
958+
959+
await runner(
960+
createQueuedRun({
961+
currentInboundEventKind: "user_request",
962+
run: {
963+
provider: "openai",
964+
model: "gpt-5.4",
965+
sourceReplyDeliveryMode: "message_tool_only",
966+
},
967+
}),
968+
);
969+
970+
const call = requireLastMockCallArg(runEmbeddedPiAgentMock, "run embedded pi agent");
971+
expect(call.abortSignal).toBeUndefined();
972+
});
973+
945974
it("keeps queued delivery correlations active during followup agent runs", async () => {
946975
const events: string[] = [];
947976
runEmbeddedPiAgentMock.mockImplementationOnce(async () => {

src/auto-reply/reply/followup-runner.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ export function createFollowupRunner(params: {
419419
sessionId: run.sessionId,
420420
sessionKey: replySessionKey ?? "",
421421
resetTriggered: false,
422-
upstreamAbortSignal: queued.abortSignal ?? opts?.abortSignal,
422+
upstreamAbortSignal: queued.abortSignal,
423423
});
424424
const runId = crypto.randomUUID();
425425
const shouldSurfaceToControlUi = isInternalMessageChannel(
@@ -654,7 +654,7 @@ export function createFollowupRunner(params: {
654654
agentAccountId: run.agentAccountId,
655655
senderIsOwner: run.senderIsOwner,
656656
disableTools: opts?.disableTools,
657-
abortSignal: queued.abortSignal ?? opts?.abortSignal,
657+
abortSignal: queued.abortSignal,
658658
},
659659
transformResult: (rawResult) =>
660660
isRoomEventCliRun && rawResult.meta.agentMeta
@@ -742,7 +742,7 @@ export function createFollowupRunner(params: {
742742
bashElevated: run.bashElevated,
743743
timeoutMs: run.timeoutMs,
744744
runId,
745-
abortSignal: queued.abortSignal ?? opts?.abortSignal,
745+
abortSignal: queued.abortSignal,
746746
images: queuedImages,
747747
imageOrder: queuedImageOrder,
748748
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,

src/auto-reply/reply/get-reply-run.media-only.test.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1593,6 +1593,7 @@ describe("runPreparedReply media-only handling", () => {
15931593
it("queues active room events as followups instead of steering fake prompts", async () => {
15941594
const queueSettings = await import("./queue/settings-runtime.js");
15951595
const piRuntime = await import("../../agents/pi-embedded.runtime.js");
1596+
const abortController = new AbortController();
15961597
vi.mocked(queueSettings.resolveQueueSettings).mockReturnValueOnce({
15971598
mode: "steer",
15981599
debounceMs: 500,
@@ -1610,6 +1611,7 @@ describe("runPreparedReply media-only handling", () => {
16101611

16111612
await runPreparedReply(
16121613
baseParams({
1614+
opts: { abortSignal: abortController.signal },
16131615
ctx: {
16141616
Body: "ambient",
16151617
RawBody: "ambient",
@@ -1638,9 +1640,58 @@ describe("runPreparedReply media-only handling", () => {
16381640
expect(call.resolvedQueue.mode).toBe("steer");
16391641
expect(call.followupRun.prompt).toBe("[OpenClaw room event]");
16401642
expect(call.followupRun.currentInboundEventKind).toBe("room_event");
1643+
expect(call.followupRun.abortSignal).toBe(abortController.signal);
16411644
expect(call.followupRun.currentInboundContext?.text).toContain("Current event:");
16421645
});
16431646

1647+
it("detaches queued user requests from superseded source abort signals", async () => {
1648+
const queueSettings = await import("./queue/settings-runtime.js");
1649+
const piRuntime = await import("../../agents/pi-embedded.runtime.js");
1650+
const abortController = new AbortController();
1651+
vi.mocked(queueSettings.resolveQueueSettings).mockReturnValueOnce({
1652+
mode: "collect",
1653+
debounceMs: 500,
1654+
cap: 20,
1655+
dropPolicy: "summarize",
1656+
});
1657+
vi.mocked(piRuntime.resolveActiveEmbeddedRunSessionId)
1658+
.mockReturnValueOnce("active-session")
1659+
.mockReturnValueOnce("active-session");
1660+
vi.mocked(piRuntime.isEmbeddedPiRunActive).mockReturnValueOnce(true);
1661+
vi.mocked(piRuntime.isEmbeddedPiRunStreaming).mockReturnValueOnce(true);
1662+
vi.mocked(buildInboundUserContextPrefix).mockReturnValueOnce("user request context");
1663+
1664+
await runPreparedReply(
1665+
baseParams({
1666+
opts: { abortSignal: abortController.signal },
1667+
ctx: {
1668+
Body: "@bot keep this",
1669+
RawBody: "@bot keep this",
1670+
CommandBody: "@bot keep this",
1671+
Provider: "telegram",
1672+
Surface: "telegram",
1673+
ChatType: "group",
1674+
},
1675+
sessionCtx: {
1676+
Body: "@bot keep this",
1677+
BodyStripped: "@bot keep this",
1678+
Provider: "telegram",
1679+
Surface: "telegram",
1680+
ChatType: "group",
1681+
InboundEventKind: "user_request",
1682+
MessageSid: "994",
1683+
SenderName: "Alice",
1684+
},
1685+
}),
1686+
);
1687+
1688+
const call = requireLastRunReplyAgentCall();
1689+
expect(call.shouldFollowup).toBe(true);
1690+
expect(call.isActive).toBe(true);
1691+
expect(call.followupRun.currentInboundEventKind).toBe("user_request");
1692+
expect(call.followupRun.abortSignal).toBeUndefined();
1693+
});
1694+
16441695
it("queues active room events instead of interrupting active user requests", async () => {
16451696
const queueSettings = await import("./queue/settings-runtime.js");
16461697
const piRuntime = await import("../../agents/pi-embedded.runtime.js");

src/auto-reply/reply/get-reply-run.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1044,12 +1044,14 @@ export async function runPreparedReply(
10441044
imageOrder: opts?.imageOrder,
10451045
}),
10461046
);
1047+
const queuedFollowupAbortSignal =
1048+
inboundEventKind === "room_event" ? opts?.abortSignal : undefined;
10471049
const followupRun = {
10481050
prompt: queuedBody,
10491051
transcriptPrompt: transcriptCommandBody,
10501052
currentInboundEventKind: inboundEventKind,
10511053
currentInboundContext,
1052-
abortSignal: opts?.abortSignal,
1054+
...(queuedFollowupAbortSignal ? { abortSignal: queuedFollowupAbortSignal } : {}),
10531055
deliveryCorrelations: opts?.queuedDeliveryCorrelations,
10541056
queuedLifecycle: opts?.queuedFollowupLifecycle,
10551057
messageId: sessionCtx.MessageSidFull ?? sessionCtx.MessageSid,

0 commit comments

Comments
 (0)