Skip to content

Commit a1b6567

Browse files
committed
fix(agents): fallback subagent completion delivery
1 parent 8741a86 commit a1b6567

9 files changed

Lines changed: 309 additions & 20 deletions

src/agents/subagent-announce-delivery.test.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,48 @@ async function deliverDiscordDirectMessageCompletion(params: {
115115
});
116116
}
117117

118+
async function deliverTelegramDirectMessageCompletion(params: {
119+
callGateway: typeof runtimeCallGateway;
120+
sendMessage?: typeof runtimeSendMessage;
121+
internalEvents?: AgentInternalEvent[];
122+
isActive?: boolean;
123+
queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean;
124+
}) {
125+
const origin = {
126+
channel: "telegram",
127+
to: "123456789",
128+
accountId: "bot-1",
129+
};
130+
__testing.setDepsForTest({
131+
callGateway: params.callGateway,
132+
getRequesterSessionActivity: () => ({
133+
sessionId: "requester-session-telegram",
134+
isActive: params.isActive === true,
135+
}),
136+
loadConfig: () => ({}) as never,
137+
...(params.queueEmbeddedPiMessage
138+
? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage }
139+
: {}),
140+
...(params.sendMessage ? { sendMessage: params.sendMessage } : {}),
141+
});
142+
143+
return deliverSubagentAnnouncement({
144+
requesterSessionKey: "agent:main:telegram:123456789",
145+
targetRequesterSessionKey: "agent:main:telegram:123456789",
146+
triggerMessage: "child done",
147+
steerMessage: "child done",
148+
requesterOrigin: origin,
149+
requesterSessionOrigin: origin,
150+
completionDirectOrigin: origin,
151+
directOrigin: origin,
152+
requesterIsSubagent: false,
153+
expectsCompletionMessage: true,
154+
bestEffortDeliver: true,
155+
directIdempotencyKey: "announce-telegram-dm-fallback",
156+
internalEvents: params.internalEvents,
157+
});
158+
}
159+
118160
async function deliverSlackChannelAnnouncement(params: {
119161
callGateway: typeof runtimeCallGateway;
120162
isActive: boolean;
@@ -510,6 +552,92 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
510552
);
511553
});
512554

555+
it("uses direct fallback for Telegram DMs when announce-agent delivery fails", async () => {
556+
const callGateway = vi.fn(async () => {
557+
throw new Error("UNAVAILABLE: requester wake failed");
558+
}) as unknown as typeof runtimeCallGateway;
559+
const sendMessage = createSendMessageMock();
560+
const result = await deliverTelegramDirectMessageCompletion({
561+
callGateway,
562+
sendMessage,
563+
internalEvents: [
564+
{
565+
type: "task_completion",
566+
source: "subagent",
567+
childSessionKey: "agent:worker:subagent:child",
568+
childSessionId: "child-session-id",
569+
announceType: "subagent task",
570+
taskLabel: "telegram completion smoke",
571+
status: "ok",
572+
statusLabel: "completed successfully",
573+
result: "child completion output",
574+
replyInstruction: "Summarize the result.",
575+
},
576+
],
577+
});
578+
579+
expect(result).toEqual(
580+
expect.objectContaining({
581+
delivered: true,
582+
path: "direct-fallback",
583+
}),
584+
);
585+
expect(sendMessage).toHaveBeenCalledWith(
586+
expect.objectContaining({
587+
channel: "telegram",
588+
accountId: "bot-1",
589+
to: "123456789",
590+
threadId: undefined,
591+
content: "child completion output",
592+
requesterSessionKey: "agent:main:telegram:123456789",
593+
bestEffort: true,
594+
idempotencyKey: "announce-telegram-dm-fallback",
595+
}),
596+
);
597+
});
598+
599+
it("uses direct fallback when an active Telegram requester cannot be woken", async () => {
600+
const callGateway = createGatewayMock();
601+
const sendMessage = createSendMessageMock();
602+
const queueEmbeddedPiMessage = vi.fn(() => false);
603+
const result = await deliverTelegramDirectMessageCompletion({
604+
callGateway,
605+
sendMessage,
606+
isActive: true,
607+
queueEmbeddedPiMessage,
608+
internalEvents: [
609+
{
610+
type: "task_completion",
611+
source: "subagent",
612+
childSessionKey: "agent:worker:subagent:child",
613+
childSessionId: "child-session-id",
614+
announceType: "subagent task",
615+
taskLabel: "telegram wake smoke",
616+
status: "ok",
617+
statusLabel: "completed successfully",
618+
result: "child completion output",
619+
replyInstruction: "Summarize the result.",
620+
},
621+
],
622+
});
623+
624+
expect(result).toEqual(
625+
expect.objectContaining({
626+
delivered: true,
627+
path: "direct-fallback",
628+
}),
629+
);
630+
expect(queueEmbeddedPiMessage).toHaveBeenCalledWith("requester-session-telegram", "child done");
631+
expect(callGateway).not.toHaveBeenCalled();
632+
expect(sendMessage).toHaveBeenCalledWith(
633+
expect.objectContaining({
634+
channel: "telegram",
635+
to: "123456789",
636+
content: "child completion output",
637+
}),
638+
);
639+
});
640+
513641
it("uses a direct thread fallback when announce-agent returns no visible output", async () => {
514642
const callGateway = createGatewayMock({
515643
result: {

src/agents/subagent-announce-delivery.ts

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,10 @@ async function sendSubagentAnnounceDirectly(params: {
681681
isGatewayMessageChannel(normalizedSessionOnlyOriginChannel)
682682
? normalizedSessionOnlyOriginChannel
683683
: undefined;
684+
const completionFallbackText =
685+
params.expectsCompletionMessage && deliveryTarget.deliver
686+
? extractThreadCompletionFallbackText(params.internalEvents)
687+
: "";
684688
const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey);
685689
if (params.expectsCompletionMessage && requesterActivity.sessionId) {
686690
const woke = requesterActivity.sessionId
@@ -696,6 +700,32 @@ async function sendSubagentAnnounceDirectly(params: {
696700
};
697701
}
698702
if (requesterActivity.isActive) {
703+
try {
704+
const didFallback = await sendCompletionFallback({
705+
cfg,
706+
channel: deliveryTarget.channel,
707+
to: deliveryTarget.to,
708+
accountId: deliveryTarget.accountId,
709+
threadId: deliveryTarget.threadId,
710+
content: completionFallbackText,
711+
requesterSessionKey: canonicalRequesterSessionKey,
712+
bestEffortDeliver: params.bestEffortDeliver,
713+
idempotencyKey: params.directIdempotencyKey,
714+
signal: params.signal,
715+
});
716+
if (didFallback) {
717+
return {
718+
delivered: true,
719+
path: resolveCompletionFallbackPath(deliveryTarget.threadId),
720+
};
721+
}
722+
} catch (err) {
723+
return {
724+
delivered: false,
725+
path: "direct",
726+
error: `active requester session could not be woken; fallback send failed: ${summarizeDeliveryError(err)}`,
727+
};
728+
}
699729
return {
700730
delivered: false,
701731
path: "direct",
@@ -709,10 +739,6 @@ async function sendSubagentAnnounceDirectly(params: {
709739
path: "none",
710740
};
711741
}
712-
const completionFallbackText =
713-
params.expectsCompletionMessage && deliveryTarget.deliver
714-
? extractThreadCompletionFallbackText(params.internalEvents)
715-
: "";
716742
let directAnnounceResponse: unknown;
717743
try {
718744
directAnnounceResponse = await runAnnounceDeliveryWithRetry({
@@ -758,22 +784,30 @@ async function sendSubagentAnnounceDirectly(params: {
758784
}),
759785
});
760786
} catch (err) {
761-
const didFallback = await sendCompletionFallback({
762-
cfg,
763-
channel: deliveryTarget.channel,
764-
to: deliveryTarget.to,
765-
accountId: deliveryTarget.accountId,
766-
threadId: deliveryTarget.threadId,
767-
content: deliveryTarget.threadId ? completionFallbackText : "",
768-
requesterSessionKey: canonicalRequesterSessionKey,
769-
bestEffortDeliver: params.bestEffortDeliver,
770-
idempotencyKey: params.directIdempotencyKey,
771-
signal: params.signal,
772-
});
787+
let didFallback = false;
788+
try {
789+
didFallback = await sendCompletionFallback({
790+
cfg,
791+
channel: deliveryTarget.channel,
792+
to: deliveryTarget.to,
793+
accountId: deliveryTarget.accountId,
794+
threadId: deliveryTarget.threadId,
795+
content: completionFallbackText,
796+
requesterSessionKey: canonicalRequesterSessionKey,
797+
bestEffortDeliver: params.bestEffortDeliver,
798+
idempotencyKey: params.directIdempotencyKey,
799+
signal: params.signal,
800+
});
801+
} catch (fallbackErr) {
802+
throw new Error(
803+
`${summarizeDeliveryError(err)}; fallback send failed: ${summarizeDeliveryError(fallbackErr)}`,
804+
{ cause: fallbackErr },
805+
);
806+
}
773807
if (didFallback) {
774808
return {
775809
delivered: true,
776-
path: "direct-thread-fallback",
810+
path: resolveCompletionFallbackPath(deliveryTarget.threadId),
777811
};
778812
}
779813
throw err;

src/agents/subagent-announce.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
resolveSubagentAnnounceTimeoutMs,
2424
resolveSubagentCompletionOrigin,
2525
} from "./subagent-announce-delivery.js";
26+
import type { SubagentAnnounceDeliveryResult } from "./subagent-announce-dispatch.js";
2627
import { resolveAnnounceOrigin } from "./subagent-announce-origin.js";
2728
import {
2829
applySubagentWaitOutcome,
@@ -244,6 +245,7 @@ export async function runSubagentAnnounceFlow(params: {
244245
wakeOnDescendantSettle?: boolean;
245246
signal?: AbortSignal;
246247
bestEffortDeliver?: boolean;
248+
onDeliveryResult?: (delivery: SubagentAnnounceDeliveryResult) => void;
247249
}): Promise<boolean> {
248250
let didAnnounce = false;
249251
const expectsCompletionMessage = params.expectsCompletionMessage === true;
@@ -562,6 +564,7 @@ export async function runSubagentAnnounceFlow(params: {
562564
directIdempotencyKey,
563565
signal: params.signal,
564566
});
567+
params.onDeliveryResult?.(delivery);
565568
didAnnounce = delivery.delivered;
566569
if (!delivery.delivered && delivery.path === "direct" && delivery.error) {
567570
defaultRuntime.error?.(

src/agents/subagent-registry-lifecycle.test.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,82 @@ describe("subagent registry lifecycle hardening", () => {
569569
expect(persist).toHaveBeenCalled();
570570
});
571571

572+
it("persists the concrete announce delivery error when cleanup gives up", async () => {
573+
const persist = vi.fn();
574+
const entry = createRunEntry({
575+
endedAt: 4_000,
576+
expectsCompletionMessage: true,
577+
retainAttachmentsOnKeep: true,
578+
});
579+
const runSubagentAnnounceFlow = vi.fn(
580+
async (announceParams: {
581+
onDeliveryResult?: (delivery: {
582+
delivered: false;
583+
path: "direct";
584+
error: string;
585+
phases: Array<{
586+
phase: "direct-primary" | "queue-fallback";
587+
delivered: boolean;
588+
path: "direct" | "none";
589+
error?: string;
590+
}>;
591+
}) => void;
592+
}) => {
593+
announceParams.onDeliveryResult?.({
594+
delivered: false,
595+
path: "direct",
596+
error: "UNAVAILABLE: requester wake failed",
597+
phases: [
598+
{
599+
phase: "direct-primary",
600+
delivered: false,
601+
path: "direct",
602+
error: "UNAVAILABLE: requester wake failed",
603+
},
604+
{
605+
phase: "queue-fallback",
606+
delivered: false,
607+
path: "none",
608+
},
609+
],
610+
});
611+
return false;
612+
},
613+
);
614+
615+
const controller = createLifecycleController({
616+
entry,
617+
persist,
618+
runSubagentAnnounceFlow,
619+
});
620+
621+
await expect(
622+
controller.completeSubagentRun({
623+
runId: entry.runId,
624+
endedAt: 4_000,
625+
outcome: { status: "ok" },
626+
reason: SUBAGENT_ENDED_REASON_COMPLETE,
627+
triggerCleanup: true,
628+
}),
629+
).resolves.toBeUndefined();
630+
631+
expect(taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith(
632+
expect.objectContaining({
633+
runId: entry.runId,
634+
runtime: "subagent",
635+
sessionKey: entry.childSessionKey,
636+
deliveryStatus: "failed",
637+
error:
638+
"UNAVAILABLE: requester wake failed; direct-primary: UNAVAILABLE: requester wake failed",
639+
}),
640+
);
641+
expect(entry.lastAnnounceDeliveryError).toBe(
642+
"UNAVAILABLE: requester wake failed; direct-primary: UNAVAILABLE: requester wake failed",
643+
);
644+
expect(entry.cleanupCompletedAt).toBeTypeOf("number");
645+
expect(persist).toHaveBeenCalled();
646+
});
647+
572648
it("skips browser cleanup when steer restart suppresses cleanup flow", async () => {
573649
const entry = createRunEntry({
574650
expectsCompletionMessage: false,

0 commit comments

Comments
 (0)