Skip to content

Commit c11ad10

Browse files
committed
fix(daemon): close SDK iterator after result, fix test session leak
Two bugs prevented reject-resume from working: 1. SDK iterator hang: query()'s iX.readSdkMessages() never terminates after yielding a result message (unlike the internal hz.stream() which breaks on result). In daemon environments the CLI subprocess stays alive (MCP cleanup, etc.), so the transport never closes and consumeEvents() blocks forever — finalize() never runs, worktree never preserved, gpg-agent processes leak. Fix: break from the SDK iterator after receiving a result message with no active subtasks, then call q.close(). Matches the SDK's own internal stream() termination logic. 2. Test session leak: runtimePool-finalize.test.ts lacked a paths mock, so clearAllSessions() in beforeEach/afterEach wiped the production sessions directory. Agent commits trigger lefthook → vitest → session files deleted mid-flight. Fix: add paths mock pointing to a temp directory.
1 parent d98e852 commit c11ad10

3 files changed

Lines changed: 182 additions & 0 deletions

File tree

packages/cli/src/providers/claude.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,9 +318,16 @@ export const claudeProvider: AgentProvider = {
318318
const events = (async function* () {
319319
const turnOpen = { value: false };
320320
const rateLimitSeen = { value: false };
321+
let activeSubtasks = 0;
322+
let resultSeen = false;
321323
for await (const msg of q) {
324+
if (msg.type === "system" && (msg as any).subtype === "task_started") activeSubtasks++;
325+
if (msg.type === "system" && (msg as any).subtype === "task_notification") activeSubtasks = Math.max(0, activeSubtasks - 1);
322326
yield* mapSDKMessageStream(msg, turnOpen, rateLimitSeen);
327+
if (msg.type === "result") resultSeen = true;
328+
if (resultSeen && activeSubtasks <= 0) break;
323329
}
330+
q.close();
324331
})();
325332

326333
let aborted = false;

tests/claude-iterator-hang.test.ts

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
// @vitest-environment node
2+
/**
3+
* Reproduce the SDK iterator hang: query()'s async iterator yields a result
4+
* message but never terminates afterward (transport stays open due to MCP
5+
* servers or other long-lived resources). The events generator in claude.ts
6+
* must break + q.close() after the final result to avoid hanging forever.
7+
*/
8+
import { describe, expect, it } from "vitest";
9+
import { mapSDKMessageStream } from "../packages/cli/src/providers/claude.js";
10+
11+
/**
12+
* Simulate SDK query() that yields messages and a result, but never
13+
* terminates the iterator (transport stays open). Only resolves when
14+
* the consumer calls .return() (triggered by break) or .close() is called.
15+
*/
16+
function makeSdkIteratorThatHangsAfterResult(messages: Array<{ type: string; [k: string]: any }>) {
17+
let closed = false;
18+
const close = () => {
19+
closed = true;
20+
};
21+
22+
const iterator = (async function* () {
23+
for (const msg of messages) {
24+
yield msg;
25+
}
26+
// Simulate transport that never closes — hang forever
27+
// (like a real SDK query with MCP servers keeping transport alive)
28+
await new Promise<void>((resolve) => {
29+
const check = setInterval(() => {
30+
if (closed) {
31+
clearInterval(check);
32+
resolve();
33+
}
34+
}, 10);
35+
});
36+
})();
37+
38+
return { iterator, close };
39+
}
40+
41+
describe("claude.ts events generator — iterator termination", () => {
42+
it("hangs WITHOUT break+close when SDK iterator doesn't terminate after result", async () => {
43+
const sdkMessages = [
44+
{ type: "system", subtype: "init", session_id: "test-session" },
45+
{ type: "assistant", message: { content: [{ type: "text", text: "hello" }] }, parent_tool_use_id: null },
46+
{ type: "result", subtype: "success", result: "done", total_cost_usd: 0.01, usage: {} },
47+
];
48+
49+
const { iterator, close } = makeSdkIteratorThatHangsAfterResult(sdkMessages);
50+
51+
// Old code: no break, just for-await — this hangs
52+
const oldEvents = (async function* () {
53+
const turnOpen = { value: false };
54+
const rateLimitSeen = { value: false };
55+
for await (const msg of iterator) {
56+
yield* mapSDKMessageStream(msg as any, turnOpen, rateLimitSeen);
57+
}
58+
})();
59+
60+
const collected: string[] = [];
61+
// Race: consume events vs 1s timeout
62+
const finished = await Promise.race([
63+
(async () => {
64+
for await (const event of oldEvents) {
65+
collected.push(event.type);
66+
}
67+
return "completed";
68+
})(),
69+
new Promise<string>((r) => setTimeout(() => r("timeout"), 1000)),
70+
]);
71+
72+
close(); // cleanup so test doesn't leak
73+
74+
expect(collected).toContain("turn.end"); // result was received
75+
expect(finished).toBe("timeout"); // but the loop never exited
76+
});
77+
78+
it("terminates correctly WITH break+close when SDK iterator doesn't terminate after result", async () => {
79+
const sdkMessages = [
80+
{ type: "system", subtype: "init", session_id: "test-session" },
81+
{ type: "assistant", message: { content: [{ type: "text", text: "hello" }] }, parent_tool_use_id: null },
82+
{ type: "result", subtype: "success", result: "done", total_cost_usd: 0.01, usage: {} },
83+
];
84+
85+
const { iterator, close } = makeSdkIteratorThatHangsAfterResult(sdkMessages);
86+
87+
// New code: break after result + close
88+
const newEvents = (async function* () {
89+
const turnOpen = { value: false };
90+
const rateLimitSeen = { value: false };
91+
let activeSubtasks = 0;
92+
let resultSeen = false;
93+
for await (const msg of iterator) {
94+
if (msg.type === "system" && (msg as any).subtype === "task_started") activeSubtasks++;
95+
if (msg.type === "system" && (msg as any).subtype === "task_notification") activeSubtasks = Math.max(0, activeSubtasks - 1);
96+
yield* mapSDKMessageStream(msg as any, turnOpen, rateLimitSeen);
97+
if (msg.type === "result") resultSeen = true;
98+
if (resultSeen && activeSubtasks <= 0) break;
99+
}
100+
close();
101+
})();
102+
103+
const collected: string[] = [];
104+
for await (const event of newEvents) {
105+
collected.push(event.type);
106+
}
107+
108+
expect(collected).toContain("turn.end");
109+
// Loop exits cleanly — no timeout needed
110+
});
111+
112+
it("waits for active subtasks to finish before breaking", async () => {
113+
const sdkMessages = [
114+
{ type: "system", subtype: "init", session_id: "test-session" },
115+
{ type: "system", subtype: "task_started", tool_use_id: "bg1", description: "background" },
116+
{ type: "result", subtype: "success", result: "main done", total_cost_usd: 0.005, usage: {} },
117+
// subtask finishes after first result
118+
{ type: "system", subtype: "task_notification", tool_use_id: "bg1", status: "completed", summary: "bg done" },
119+
// second result after subtask completes
120+
{ type: "result", subtype: "success", result: "all done", total_cost_usd: 0.01, usage: {} },
121+
];
122+
123+
const { iterator, close } = makeSdkIteratorThatHangsAfterResult(sdkMessages);
124+
125+
const newEvents = (async function* () {
126+
const turnOpen = { value: false };
127+
const rateLimitSeen = { value: false };
128+
let activeSubtasks = 0;
129+
let resultSeen = false;
130+
for await (const msg of iterator) {
131+
if (msg.type === "system" && (msg as any).subtype === "task_started") activeSubtasks++;
132+
if (msg.type === "system" && (msg as any).subtype === "task_notification") activeSubtasks = Math.max(0, activeSubtasks - 1);
133+
yield* mapSDKMessageStream(msg as any, turnOpen, rateLimitSeen);
134+
if (msg.type === "result") resultSeen = true;
135+
if (resultSeen && activeSubtasks <= 0) break;
136+
}
137+
close();
138+
})();
139+
140+
const collected: string[] = [];
141+
for await (const event of newEvents) {
142+
collected.push(event.type);
143+
}
144+
145+
// First result's turn.end is consumed, then subtask ends, then break.
146+
// Second result is never reached — break fires when activeSubtasks hits 0.
147+
const turnEnds = collected.filter((t) => t === "turn.end");
148+
expect(turnEnds).toHaveLength(1);
149+
// Subtask events were consumed before break
150+
expect(collected).toContain("subtask.start");
151+
});
152+
});

tests/runtimePool-finalize.test.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,29 @@
1717

1818
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
1919

20+
// ---- Mock paths to avoid polluting production sessions dir ------------------
21+
vi.mock("../packages/cli/src/paths.js", () => {
22+
const { join } = require("node:path");
23+
const { tmpdir } = require("node:os");
24+
const base = join(tmpdir(), `ak-test-runtimepool-fin-${process.pid}`);
25+
return {
26+
STATE_DIR: base,
27+
CONFIG_DIR: base,
28+
DATA_DIR: base,
29+
LOGS_DIR: join(base, "logs"),
30+
CONFIG_FILE: join(base, "config.json"),
31+
PID_FILE: join(base, "daemon.pid"),
32+
DAEMON_STATE_FILE: join(base, "daemon-state.json"),
33+
REPOS_DIR: join(base, "repos"),
34+
WORKTREES_DIR: join(base, "worktrees"),
35+
SESSIONS_DIR: join(base, "sessions"),
36+
TRACKED_TASKS_FILE: join(base, "tracked-tasks.json"),
37+
IDENTITIES_DIR: join(base, "identities"),
38+
LEGACY_SAVED_SESSIONS_FILE: join(base, "saved-sessions.json"),
39+
LEGACY_SESSION_PIDS_FILE: join(base, "session-pids.json"),
40+
};
41+
});
42+
2043
// ---- Mock logger (avoid pino noise in test output) -------------------------
2144
vi.mock("../packages/cli/src/logger.js", () => ({
2245
createLogger: () => ({

0 commit comments

Comments
 (0)