Skip to content

Commit 304703f

Browse files
joeykrugsteipete
authored andcommitted
fix: resume orphaned subagent sessions after SIGUSR1 reload
Closes #47711 After a SIGUSR1 gateway reload aborts in-flight subagent LLM calls, the gateway now scans for orphaned sessions and sends a synthetic resume message to restart their work. Also makes the deferral timeout configurable via gateway.reload.deferralTimeoutMs (default: 5 minutes, up from 90s).
1 parent e627a50 commit 304703f

9 files changed

Lines changed: 642 additions & 2 deletions
Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
2+
import type { SubagentRunRecord } from "./subagent-registry.types.js";
3+
4+
// Mock dependencies before importing the module under test
5+
vi.mock("../config/config.js", () => ({
6+
loadConfig: vi.fn(() => ({
7+
session: { store: undefined },
8+
})),
9+
}));
10+
11+
vi.mock("../config/sessions.js", () => ({
12+
loadSessionStore: vi.fn(() => ({})),
13+
resolveAgentIdFromSessionKey: vi.fn(() => "main"),
14+
resolveStorePath: vi.fn(() => "/tmp/test-sessions.json"),
15+
updateSessionStore: vi.fn(async () => {}),
16+
}));
17+
18+
vi.mock("../gateway/call.js", () => ({
19+
callGateway: vi.fn(async () => ({ runId: "test-run-id" })),
20+
}));
21+
22+
function createTestRunRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
23+
return {
24+
runId: "run-1",
25+
childSessionKey: "agent:main:subagent:test-session-1",
26+
requesterSessionKey: "agent:main:signal:direct:+1234567890",
27+
requesterDisplayKey: "main",
28+
task: "Test task: implement feature X",
29+
cleanup: "delete",
30+
createdAt: Date.now() - 60_000,
31+
startedAt: Date.now() - 55_000,
32+
...overrides,
33+
};
34+
}
35+
36+
describe("subagent-orphan-recovery", () => {
37+
beforeEach(() => {
38+
vi.clearAllMocks();
39+
});
40+
41+
afterEach(() => {
42+
vi.restoreAllMocks();
43+
});
44+
45+
it("recovers orphaned sessions with abortedLastRun=true", async () => {
46+
const sessions = await import("../config/sessions.js");
47+
const gateway = await import("../gateway/call.js");
48+
49+
const sessionEntry = {
50+
sessionId: "session-abc",
51+
updatedAt: Date.now(),
52+
abortedLastRun: true,
53+
};
54+
55+
vi.mocked(sessions.loadSessionStore).mockReturnValue({
56+
"agent:main:subagent:test-session-1": sessionEntry,
57+
});
58+
59+
const activeRuns = new Map<string, SubagentRunRecord>();
60+
activeRuns.set("run-1", createTestRunRecord());
61+
62+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
63+
64+
const result = await recoverOrphanedSubagentSessions({
65+
getActiveRuns: () => activeRuns,
66+
});
67+
68+
expect(result.recovered).toBe(1);
69+
expect(result.failed).toBe(0);
70+
expect(result.skipped).toBe(0);
71+
72+
// Should have called callGateway to resume the session
73+
expect(gateway.callGateway).toHaveBeenCalledOnce();
74+
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
75+
const opts = callArgs[0];
76+
expect(opts.method).toBe("agent");
77+
const params = opts.params as Record<string, unknown>;
78+
expect(params.sessionKey).toBe("agent:main:subagent:test-session-1");
79+
expect(params.message).toContain("gateway reload");
80+
expect(params.message).toContain("Test task: implement feature X");
81+
});
82+
83+
it("skips sessions that are not aborted", async () => {
84+
const sessions = await import("../config/sessions.js");
85+
const gateway = await import("../gateway/call.js");
86+
87+
vi.mocked(sessions.loadSessionStore).mockReturnValue({
88+
"agent:main:subagent:test-session-1": {
89+
sessionId: "session-abc",
90+
updatedAt: Date.now(),
91+
abortedLastRun: false,
92+
},
93+
});
94+
95+
const activeRuns = new Map<string, SubagentRunRecord>();
96+
activeRuns.set("run-1", createTestRunRecord());
97+
98+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
99+
100+
const result = await recoverOrphanedSubagentSessions({
101+
getActiveRuns: () => activeRuns,
102+
});
103+
104+
expect(result.recovered).toBe(0);
105+
expect(result.skipped).toBe(1);
106+
expect(gateway.callGateway).not.toHaveBeenCalled();
107+
});
108+
109+
it("skips runs that have already ended", async () => {
110+
const gateway = await import("../gateway/call.js");
111+
112+
const activeRuns = new Map<string, SubagentRunRecord>();
113+
activeRuns.set(
114+
"run-1",
115+
createTestRunRecord({
116+
endedAt: Date.now() - 1000,
117+
}),
118+
);
119+
120+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
121+
122+
const result = await recoverOrphanedSubagentSessions({
123+
getActiveRuns: () => activeRuns,
124+
});
125+
126+
expect(result.recovered).toBe(0);
127+
expect(gateway.callGateway).not.toHaveBeenCalled();
128+
});
129+
130+
it("handles multiple orphaned sessions", async () => {
131+
const sessions = await import("../config/sessions.js");
132+
const gateway = await import("../gateway/call.js");
133+
134+
vi.mocked(sessions.loadSessionStore).mockReturnValue({
135+
"agent:main:subagent:session-a": {
136+
sessionId: "id-a",
137+
updatedAt: Date.now(),
138+
abortedLastRun: true,
139+
},
140+
"agent:main:subagent:session-b": {
141+
sessionId: "id-b",
142+
updatedAt: Date.now(),
143+
abortedLastRun: true,
144+
},
145+
"agent:main:subagent:session-c": {
146+
sessionId: "id-c",
147+
updatedAt: Date.now(),
148+
abortedLastRun: false,
149+
},
150+
});
151+
152+
const activeRuns = new Map<string, SubagentRunRecord>();
153+
activeRuns.set(
154+
"run-a",
155+
createTestRunRecord({
156+
runId: "run-a",
157+
childSessionKey: "agent:main:subagent:session-a",
158+
task: "Task A",
159+
}),
160+
);
161+
activeRuns.set(
162+
"run-b",
163+
createTestRunRecord({
164+
runId: "run-b",
165+
childSessionKey: "agent:main:subagent:session-b",
166+
task: "Task B",
167+
}),
168+
);
169+
activeRuns.set(
170+
"run-c",
171+
createTestRunRecord({
172+
runId: "run-c",
173+
childSessionKey: "agent:main:subagent:session-c",
174+
task: "Task C",
175+
}),
176+
);
177+
178+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
179+
180+
const result = await recoverOrphanedSubagentSessions({
181+
getActiveRuns: () => activeRuns,
182+
});
183+
184+
expect(result.recovered).toBe(2);
185+
expect(result.skipped).toBe(1);
186+
expect(gateway.callGateway).toHaveBeenCalledTimes(2);
187+
});
188+
189+
it("handles callGateway failure gracefully", async () => {
190+
const sessions = await import("../config/sessions.js");
191+
const gateway = await import("../gateway/call.js");
192+
193+
vi.mocked(sessions.loadSessionStore).mockReturnValue({
194+
"agent:main:subagent:test-session-1": {
195+
sessionId: "session-abc",
196+
updatedAt: Date.now(),
197+
abortedLastRun: true,
198+
},
199+
});
200+
201+
vi.mocked(gateway.callGateway).mockRejectedValue(new Error("gateway unavailable"));
202+
203+
const activeRuns = new Map<string, SubagentRunRecord>();
204+
activeRuns.set("run-1", createTestRunRecord());
205+
206+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
207+
208+
const result = await recoverOrphanedSubagentSessions({
209+
getActiveRuns: () => activeRuns,
210+
});
211+
212+
expect(result.recovered).toBe(0);
213+
expect(result.failed).toBe(1);
214+
});
215+
216+
it("returns empty results when no active runs exist", async () => {
217+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
218+
219+
const result = await recoverOrphanedSubagentSessions({
220+
getActiveRuns: () => new Map(),
221+
});
222+
223+
expect(result.recovered).toBe(0);
224+
expect(result.failed).toBe(0);
225+
expect(result.skipped).toBe(0);
226+
});
227+
228+
it("skips sessions with missing session entry in store", async () => {
229+
const sessions = await import("../config/sessions.js");
230+
const gateway = await import("../gateway/call.js");
231+
232+
// Store has no matching entry
233+
vi.mocked(sessions.loadSessionStore).mockReturnValue({});
234+
235+
const activeRuns = new Map<string, SubagentRunRecord>();
236+
activeRuns.set("run-1", createTestRunRecord());
237+
238+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
239+
240+
const result = await recoverOrphanedSubagentSessions({
241+
getActiveRuns: () => activeRuns,
242+
});
243+
244+
expect(result.recovered).toBe(0);
245+
expect(result.skipped).toBe(1);
246+
expect(gateway.callGateway).not.toHaveBeenCalled();
247+
});
248+
249+
it("clears abortedLastRun flag before resuming", async () => {
250+
const sessions = await import("../config/sessions.js");
251+
252+
vi.mocked(sessions.loadSessionStore).mockReturnValue({
253+
"agent:main:subagent:test-session-1": {
254+
sessionId: "session-abc",
255+
updatedAt: Date.now(),
256+
abortedLastRun: true,
257+
},
258+
});
259+
260+
const activeRuns = new Map<string, SubagentRunRecord>();
261+
activeRuns.set("run-1", createTestRunRecord());
262+
263+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
264+
265+
await recoverOrphanedSubagentSessions({
266+
getActiveRuns: () => activeRuns,
267+
});
268+
269+
// updateSessionStore should have been called to clear the flag
270+
expect(sessions.updateSessionStore).toHaveBeenCalledOnce();
271+
const calls = vi.mocked(sessions.updateSessionStore).mock.calls;
272+
const [storePath, updater] = calls[0];
273+
expect(storePath).toBe("/tmp/test-sessions.json");
274+
275+
// Simulate the updater to verify it clears abortedLastRun
276+
const mockStore: Record<string, { abortedLastRun?: boolean; updatedAt?: number }> = {
277+
"agent:main:subagent:test-session-1": {
278+
abortedLastRun: true,
279+
updatedAt: 0,
280+
},
281+
};
282+
(updater as (store: Record<string, unknown>) => void)(mockStore);
283+
expect(mockStore["agent:main:subagent:test-session-1"]?.abortedLastRun).toBe(false);
284+
});
285+
286+
it("truncates long task descriptions in resume message", async () => {
287+
const sessions = await import("../config/sessions.js");
288+
const gateway = await import("../gateway/call.js");
289+
290+
vi.mocked(sessions.loadSessionStore).mockReturnValue({
291+
"agent:main:subagent:test-session-1": {
292+
sessionId: "session-abc",
293+
updatedAt: Date.now(),
294+
abortedLastRun: true,
295+
},
296+
});
297+
298+
const longTask = "x".repeat(5000);
299+
const activeRuns = new Map<string, SubagentRunRecord>();
300+
activeRuns.set("run-1", createTestRunRecord({ task: longTask }));
301+
302+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
303+
304+
await recoverOrphanedSubagentSessions({
305+
getActiveRuns: () => activeRuns,
306+
});
307+
308+
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
309+
const opts = callArgs[0];
310+
const params = opts.params as Record<string, unknown>;
311+
const message = params.message as string;
312+
// Message should contain truncated task (2000 chars + "...")
313+
expect(message.length).toBeLessThan(5000);
314+
expect(message).toContain("...");
315+
});
316+
});

0 commit comments

Comments
 (0)