Skip to content

Commit 1841dd9

Browse files
authored
fix(subagent-announce): defer drain while parent session is busy (#71706)
When a subagent finishes while its parent main session is still running (executing tools or awaiting model output), the announce queue would follow the configured debounce and immediately attempt to deliver the completion event back into the parent session via callGateway. The gateway treats the parent as busy and the announce can either get buffered until the next external user message or surface only as a delayed echo, breaking the natural sessions_spawn -> sessions_yield workflow where the parent expects the result to arrive as the next turn. This change adds an optional shouldDefer hook on the announce queue state. The delivery layer wires it to the existing requester session activity probe (resolveRequesterSessionActivity), so while the parent session is still active the drain loop sleeps for max(250ms, debounceMs) and re-checks instead of pushing the announce. As soon as the parent goes idle, the queue drains normally. - Plumbs shouldDefer through getAnnounceQueue / enqueueAnnounce. - Skips drain step in scheduleAnnounceDrain when shouldDefer says the target is still busy, with a bounded re-check sleep. - Updates maybeQueueSubagentAnnounce to pass the activity probe. - Adds a unit test that holds drain while parent is busy and resumes when it goes idle. No behavior change for callers that do not pass shouldDefer.
1 parent ca1a6e2 commit 1841dd9

3 files changed

Lines changed: 161 additions & 2 deletions

File tree

src/agents/subagent-announce-delivery.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ async function maybeQueueSubagentAnnounce(params: {
475475
},
476476
settings: queueSettings,
477477
send: sendAnnounce,
478+
shouldDefer: (item) => resolveRequesterSessionActivity(item.sessionKey).isActive,
478479
});
479480
return didQueue ? "queued" : "dropped";
480481
}

src/agents/subagent-announce-queue.test.ts

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { afterEach, describe, expect, it, vi } from "vitest";
2-
import { enqueueAnnounce, resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
2+
import {
3+
type AnnounceQueueItem,
4+
enqueueAnnounce,
5+
resetAnnounceQueuesForTests,
6+
} from "./subagent-announce-queue.js";
37

48
function createRetryingSend() {
59
const prompts: string[] = [];
@@ -118,6 +122,131 @@ describe("subagent-announce-queue", () => {
118122
expect(sender.prompts[1]).toContain("queued item two");
119123
});
120124

125+
it("waits until a busy parent session becomes idle before draining", async () => {
126+
vi.useFakeTimers();
127+
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
128+
129+
let parentBusy = true;
130+
const send = vi.fn(async (_item: AnnounceQueueItem) => {});
131+
132+
enqueueAnnounce({
133+
key: "announce:test:busy-parent",
134+
item: {
135+
prompt: "child completed",
136+
enqueuedAt: Date.now(),
137+
sessionKey: "agent:main:telegram:dm:u1",
138+
},
139+
settings: { mode: "followup", debounceMs: 0 },
140+
send,
141+
shouldDefer: () => parentBusy,
142+
});
143+
144+
await vi.advanceTimersByTimeAsync(249);
145+
expect(send).not.toHaveBeenCalled();
146+
147+
await vi.advanceTimersByTimeAsync(1);
148+
expect(send).not.toHaveBeenCalled();
149+
150+
parentBusy = false;
151+
await vi.advanceTimersByTimeAsync(250);
152+
expect(send).toHaveBeenCalledTimes(1);
153+
expect(send.mock.calls[0]?.[0]?.prompt).toBe("child completed");
154+
});
155+
156+
it("preserves an existing defer hook when the same queue is reused without one", async () => {
157+
vi.useFakeTimers();
158+
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
159+
160+
let parentBusy = true;
161+
const send = vi.fn(async (_item: AnnounceQueueItem) => {});
162+
163+
enqueueAnnounce({
164+
key: "announce:test:reuse-keeps-defer",
165+
item: {
166+
prompt: "first child completed",
167+
enqueuedAt: Date.now(),
168+
sessionKey: "agent:main:telegram:dm:u1",
169+
},
170+
settings: { mode: "followup", debounceMs: 0 },
171+
send,
172+
shouldDefer: () => parentBusy,
173+
});
174+
175+
enqueueAnnounce({
176+
key: "announce:test:reuse-keeps-defer",
177+
item: {
178+
prompt: "second child completed",
179+
enqueuedAt: Date.now(),
180+
sessionKey: "agent:main:telegram:dm:u1",
181+
},
182+
settings: { mode: "followup", debounceMs: 0 },
183+
send,
184+
});
185+
186+
await vi.advanceTimersByTimeAsync(250);
187+
expect(send).not.toHaveBeenCalled();
188+
189+
parentBusy = false;
190+
await vi.advanceTimersByTimeAsync(250);
191+
expect(send).toHaveBeenCalledTimes(2);
192+
});
193+
194+
it("polls deferred items at the configured cadence after the first debounce", async () => {
195+
vi.useFakeTimers();
196+
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
197+
198+
let parentBusy = true;
199+
const send = vi.fn(async (_item: AnnounceQueueItem) => {});
200+
201+
enqueueAnnounce({
202+
key: "announce:test:defer-cadence",
203+
item: {
204+
prompt: "child completed",
205+
enqueuedAt: Date.now(),
206+
sessionKey: "agent:main:telegram:dm:u1",
207+
},
208+
settings: { mode: "followup", debounceMs: 1_000 },
209+
send,
210+
shouldDefer: () => parentBusy,
211+
});
212+
213+
await vi.advanceTimersByTimeAsync(1_000);
214+
expect(send).not.toHaveBeenCalled();
215+
216+
parentBusy = false;
217+
await vi.advanceTimersByTimeAsync(999);
218+
expect(send).not.toHaveBeenCalled();
219+
220+
await vi.advanceTimersByTimeAsync(1);
221+
expect(send).toHaveBeenCalledTimes(1);
222+
});
223+
224+
it("falls back to delivery when busy-parent deferral exceeds the safety cap", async () => {
225+
vi.useFakeTimers();
226+
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
227+
228+
const send = vi.fn(async (_item: AnnounceQueueItem) => {});
229+
230+
enqueueAnnounce({
231+
key: "announce:test:busy-parent-timeout",
232+
item: {
233+
prompt: "child completed after stale busy state",
234+
enqueuedAt: Date.now(),
235+
sessionKey: "agent:main:telegram:dm:u1",
236+
},
237+
settings: { mode: "followup", debounceMs: 0 },
238+
send,
239+
shouldDefer: () => true,
240+
});
241+
242+
await vi.advanceTimersByTimeAsync(14_999);
243+
expect(send).not.toHaveBeenCalled();
244+
245+
await vi.advanceTimersByTimeAsync(1);
246+
expect(send).toHaveBeenCalledTimes(1);
247+
expect(send.mock.calls[0]?.[0]?.prompt).toBe("child completed after stale busy state");
248+
});
249+
121250
it("uses debounce floor for retries when debounce exceeds backoff", async () => {
122251
vi.useFakeTimers();
123252
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));

src/agents/subagent-announce-queue.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,14 @@ type AnnounceQueueState = {
5050
droppedCount: number;
5151
summaryLines: string[];
5252
send: (item: AnnounceQueueItem) => Promise<void>;
53+
/** Return true while the target parent session is still busy and delivery should wait. */
54+
shouldDefer?: (item: AnnounceQueueItem) => boolean;
5355
/** Consecutive drain failures — drives exponential backoff on errors. */
5456
consecutiveFailures: number;
5557
};
5658

5759
const ANNOUNCE_QUEUES = new Map<string, AnnounceQueueState>();
60+
const MAX_DEFER_WHILE_BUSY_MS = 15_000;
5861

5962
export function resetAnnounceQueuesForTests() {
6063
// Test isolation: other suites may leave a draining queue behind in the worker.
@@ -72,6 +75,7 @@ function getAnnounceQueue(
7275
key: string,
7376
settings: AnnounceQueueSettings,
7477
send: (item: AnnounceQueueItem) => Promise<void>,
78+
shouldDefer?: (item: AnnounceQueueItem) => boolean,
7579
) {
7680
const existing = ANNOUNCE_QUEUES.get(key);
7781
if (existing) {
@@ -80,6 +84,9 @@ function getAnnounceQueue(
8084
settings,
8185
});
8286
existing.send = send;
87+
if (shouldDefer !== undefined) {
88+
existing.shouldDefer = shouldDefer;
89+
}
8390
return existing;
8491
}
8592
const created: AnnounceQueueState = {
@@ -93,6 +100,7 @@ function getAnnounceQueue(
93100
droppedCount: 0,
94101
summaryLines: [],
95102
send,
103+
shouldDefer,
96104
consecutiveFailures: 0,
97105
};
98106
applyQueueRuntimeSettings({
@@ -115,6 +123,20 @@ function hasAnnounceCrossChannelItems(items: AnnounceQueueItem[]): boolean {
115123
});
116124
}
117125

126+
function shouldDeferAnnounceQueueItem(queue: AnnounceQueueState, item: AnnounceQueueItem): boolean {
127+
if (!queue.shouldDefer?.(item)) {
128+
return false;
129+
}
130+
return Date.now() - item.enqueuedAt < MAX_DEFER_WHILE_BUSY_MS;
131+
}
132+
133+
function waitBeforeDeferredAnnounceRetry(queue: AnnounceQueueState): Promise<void> {
134+
return new Promise<void>((resolve) => {
135+
const timer = setTimeout(resolve, Math.max(250, queue.debounceMs));
136+
timer.unref?.();
137+
});
138+
}
139+
118140
function scheduleAnnounceDrain(key: string) {
119141
const queue = beginQueueDrain(ANNOUNCE_QUEUES, key);
120142
if (!queue) {
@@ -128,6 +150,12 @@ function scheduleAnnounceDrain(key: string) {
128150
break;
129151
}
130152
await waitForQueueDebounce(queue);
153+
const nextItem = queue.items[0];
154+
if (nextItem && shouldDeferAnnounceQueueItem(queue, nextItem)) {
155+
await waitBeforeDeferredAnnounceRetry(queue);
156+
queue.lastEnqueuedAt = Date.now() - queue.debounceMs;
157+
continue;
158+
}
131159
if (queue.mode === "collect") {
132160
const collectDrainResult = await drainCollectQueueStep({
133161
collectState,
@@ -211,8 +239,9 @@ export function enqueueAnnounce(params: {
211239
item: AnnounceQueueItem;
212240
settings: AnnounceQueueSettings;
213241
send: (item: AnnounceQueueItem) => Promise<void>;
242+
shouldDefer?: (item: AnnounceQueueItem) => boolean;
214243
}): boolean {
215-
const queue = getAnnounceQueue(params.key, params.settings, params.send);
244+
const queue = getAnnounceQueue(params.key, params.settings, params.send, params.shouldDefer);
216245
// Preserve any retry backoff marker already encoded in lastEnqueuedAt.
217246
queue.lastEnqueuedAt = Math.max(queue.lastEnqueuedAt, Date.now());
218247

0 commit comments

Comments
 (0)