Skip to content

Commit ab8c834

Browse files
committed
fix: report dropped subagent announce queue deliveries
1 parent 0fc2740 commit ab8c834

4 files changed

Lines changed: 84 additions & 6 deletions

File tree

src/agents/subagent-announce-dispatch.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,25 @@ describe("runSubagentAnnounceDispatch", () => {
135135
]);
136136
});
137137

138+
it("does not fall through to direct delivery when non-completion queue drops the new item", async () => {
139+
const queue = vi.fn(async () => "dropped" as const);
140+
const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const }));
141+
142+
const result = await runSubagentAnnounceDispatch({
143+
expectsCompletionMessage: false,
144+
queue,
145+
direct,
146+
});
147+
148+
expect(queue).toHaveBeenCalledTimes(1);
149+
expect(direct).not.toHaveBeenCalled();
150+
expect(result).toEqual({
151+
delivered: false,
152+
path: "none",
153+
phases: [{ phase: "queue-primary", delivered: false, path: "none", error: undefined }],
154+
});
155+
});
156+
138157
it("preserves direct failure when completion dispatch aborts before fallback queue", async () => {
139158
const controller = new AbortController();
140159
const queue = vi.fn(async () => "queued" as const);

src/agents/subagent-announce-dispatch.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
export type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none";
22

3-
export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none";
3+
export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none" | "dropped";
44

55
export type SubagentAnnounceDeliveryResult = {
66
delivered: boolean;
@@ -70,11 +70,15 @@ export async function runSubagentAnnounceDispatch(params: {
7070
}
7171

7272
if (!params.expectsCompletionMessage) {
73-
const primaryQueue = mapQueueOutcomeToDeliveryResult(await params.queue());
73+
const primaryQueueOutcome = await params.queue();
74+
const primaryQueue = mapQueueOutcomeToDeliveryResult(primaryQueueOutcome);
7475
appendPhase("queue-primary", primaryQueue);
7576
if (primaryQueue.delivered) {
7677
return withPhases(primaryQueue);
7778
}
79+
if (primaryQueueOutcome === "dropped") {
80+
return withPhases(primaryQueue);
81+
}
7882

7983
const primaryDirect = await params.direct();
8084
appendPhase("direct-primary", primaryDirect);
@@ -91,7 +95,8 @@ export async function runSubagentAnnounceDispatch(params: {
9195
return withPhases(primaryDirect);
9296
}
9397

94-
const fallbackQueue = mapQueueOutcomeToDeliveryResult(await params.queue());
98+
const fallbackQueueOutcome = await params.queue();
99+
const fallbackQueue = mapQueueOutcomeToDeliveryResult(fallbackQueueOutcome);
95100
appendPhase("queue-fallback", fallbackQueue);
96101
if (fallbackQueue.delivered) {
97102
return withPhases(fallbackQueue);

src/agents/subagent-announce.format.e2e.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1469,6 +1469,60 @@ describe("subagent announce formatting", () => {
14691469
expect(agentSpy).toHaveBeenCalledTimes(1);
14701470
});
14711471

1472+
it("does not report queued delivery when active announce queue drops a new item", async () => {
1473+
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
1474+
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);
1475+
sessionStore = {
1476+
"agent:main:main": {
1477+
sessionId: "session-drop-new",
1478+
lastChannel: "telegram",
1479+
lastTo: "123",
1480+
queueMode: "followup",
1481+
queueDebounceMs: 0,
1482+
queueCap: 1,
1483+
queueDrop: "new",
1484+
},
1485+
};
1486+
1487+
let resolveFirstSend = () => {};
1488+
const firstSendPending = new Promise<void>((resolve) => {
1489+
resolveFirstSend = resolve;
1490+
});
1491+
agentSpy.mockImplementation(async (_req: AgentCallRequest) => {
1492+
await firstSendPending;
1493+
return { runId: "run-main", status: "ok" };
1494+
});
1495+
1496+
const firstDidAnnounce = await runSubagentAnnounceFlow({
1497+
childSessionKey: "agent:main:subagent:test",
1498+
childRunId: "run-queued-first",
1499+
requesterSessionKey: "main",
1500+
requesterDisplayKey: "main",
1501+
announceType: "subagent task",
1502+
...defaultOutcomeAnnounce,
1503+
});
1504+
1505+
await vi.waitFor(() => {
1506+
expect(agentSpy).toHaveBeenCalledTimes(1);
1507+
});
1508+
1509+
const secondDidAnnounce = await runSubagentAnnounceFlow({
1510+
childSessionKey: "agent:main:subagent:test",
1511+
childRunId: "run-queued-dropped",
1512+
requesterSessionKey: "main",
1513+
requesterDisplayKey: "main",
1514+
announceType: "subagent task",
1515+
...defaultOutcomeAnnounce,
1516+
});
1517+
1518+
expect(firstDidAnnounce).toBe(true);
1519+
expect(secondDidAnnounce).toBe(false);
1520+
expect(agentSpy).toHaveBeenCalledTimes(1);
1521+
1522+
resolveFirstSend();
1523+
await Promise.resolve();
1524+
});
1525+
14721526
it("keeps queued idempotency unique for same-ms distinct child runs", async () => {
14731527
embeddedRunMock.isEmbeddedPiRunActive.mockReturnValue(true);
14741528
embeddedRunMock.isEmbeddedPiRunStreaming.mockReturnValue(false);

src/agents/subagent-announce.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -830,7 +830,7 @@ async function maybeQueueSubagentAnnounce(params: {
830830
sourceTool?: string;
831831
internalEvents?: AgentInternalEvent[];
832832
signal?: AbortSignal;
833-
}): Promise<"steered" | "queued" | "none"> {
833+
}): Promise<"steered" | "queued" | "none" | "dropped"> {
834834
if (params.signal?.aborted) {
835835
return "none";
836836
}
@@ -863,7 +863,7 @@ async function maybeQueueSubagentAnnounce(params: {
863863
queueSettings.mode === "interrupt";
864864
if (isActive && (shouldFollowup || queueSettings.mode === "steer")) {
865865
const origin = resolveAnnounceOrigin(entry, params.requesterOrigin);
866-
enqueueAnnounce({
866+
const didQueue = enqueueAnnounce({
867867
key: buildAnnounceQueueKey(canonicalKey, origin),
868868
item: {
869869
announceId: params.announceId,
@@ -880,7 +880,7 @@ async function maybeQueueSubagentAnnounce(params: {
880880
settings: queueSettings,
881881
send: sendAnnounce,
882882
});
883-
return "queued";
883+
return didQueue ? "queued" : "dropped";
884884
}
885885

886886
return "none";

0 commit comments

Comments
 (0)