Skip to content

Commit 2288bcd

Browse files
committed
fix(daemon): persist rate-limit resumeAfter so restart doesn't strand sessions
Before: RateLimiter held the reset window only in memory (setTimeout). On daemon restart the timer was lost and session files had no resumeAfter, so resumeBackoffSessions skipped them — recovery only happened if a new task re-hit the same runtime's limit. Now routeRateLimit writes resumeAfter to the session file before notifying the in-memory sink, so a fresh DaemonLoop picks up stranded sessions on its first tick.
1 parent e7afd1f commit 2288bcd

3 files changed

Lines changed: 188 additions & 5 deletions

File tree

packages/cli/src/daemon/runtimePool.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ async function routeEvent(
244244

245245
switch (event.type) {
246246
case "turn.rate_limit":
247-
routeRateLimit(agent, event, rateLimitSink);
247+
await routeRateLimit(agent, event, rateLimitSink);
248248
return;
249249
case "turn.error":
250250
logger.warn(`Agent error on task ${agent.taskId} (${agent.providerName}): ${event.detail}`);
@@ -263,20 +263,28 @@ async function routeEvent(
263263
}
264264
}
265265

266-
function routeRateLimit(agent: AgentFlags, event: Extract<AgentEvent, { type: "turn.rate_limit" }>, sink: RateLimitSink): void {
266+
async function routeRateLimit(agent: AgentFlags, event: Extract<AgentEvent, { type: "turn.rate_limit" }>, sink: RateLimitSink): Promise<void> {
267267
const runtime = agent.providerName;
268268
if (event.status === "rejected") {
269269
const mainReset = event.resetAt;
270270
const overageReset = event.overage?.status === "rejected" ? event.overage.resetAt : undefined;
271-
const candidates = [mainReset, overageReset].filter((x): x is string => !!x);
271+
const resetTimestamps = [mainReset, overageReset].filter((x): x is string => !!x);
272272
const pauseUntil =
273-
candidates.length > 0
274-
? candidates.reduce((a, b) => (new Date(a).getTime() >= new Date(b).getTime() ? a : b))
273+
resetTimestamps.length > 0
274+
? resetTimestamps.reduce((a, b) => (new Date(a).getTime() >= new Date(b).getTime() ? a : b))
275275
: new Date(Date.now() + 60 * 60 * 1000).toISOString();
276276
logger.warn(
277277
`Rate limited on task ${agent.taskId} (${runtime}), pausing until ${pauseUntil}${event.isUsingOverage ? " — agent continues via overage" : ""}`,
278278
);
279279
agent.rateLimited = true;
280+
// Persist first, then notify the in-memory sink. Reverse order would leave
281+
// a crash window: if the daemon died between sink update and disk write,
282+
// the session would come back up without resumeAfter and be stranded —
283+
// exactly the bug this fix exists to prevent.
284+
const resumeAfter = new Date(pauseUntil).getTime();
285+
await getSessionManager()
286+
.patch(agent.sessionId, { resumeAfter })
287+
.catch((e) => logger.warn(`Failed to persist resumeAfter for ${agent.sessionId.slice(0, 8)}: ${errMessage(e)}`));
280288
sink.onRateLimited(runtime, pauseUntil);
281289
return;
282290
}

tests/loop-rate-limit-prompt.test.ts

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ vi.mock("../packages/cli/src/logger.js", () => ({
6363
// ---- Imports after mocks are set up -----------------------------------------
6464

6565
import { DaemonLoop } from "../packages/cli/src/daemon/loop.js";
66+
import { _setSessionManagerForTest, SessionManager } from "../packages/cli/src/session/manager.js";
67+
import { clearAllSessions, writeSession } from "../packages/cli/src/session/store.js";
6668
import type { SessionFile } from "../packages/cli/src/session/types.js";
6769

6870
// ---- Minimal fakes ----------------------------------------------------------
@@ -230,6 +232,111 @@ describe("DaemonLoop — RATE_LIMIT_RESUME_PROMPT is non-empty", () => {
230232
});
231233
});
232234

235+
describe("post-crash restart: resumeAfter on disk → fresh DaemonLoop resumes (full chain)", () => {
236+
/**
237+
* Integration test for the daemon-restart scenario described in the bug fix:
238+
*
239+
* 1. routeRateLimit writes a session file to disk with status="rate_limited"
240+
* and resumeAfter=<past epoch ms> — exactly what happens when an agent hits
241+
* the rate limit and then the daemon process is killed.
242+
* 2. A fresh DaemonLoop is constructed with a brand-new SessionManager (no
243+
* in-memory rate-limit state) — simulating a daemon restart.
244+
* 3. resumeBackoffSessions() scans disk, finds the expired session, and
245+
* calls resumeOneSession.
246+
*
247+
* This verifies the FULL CHAIN, not just either half in isolation.
248+
* Note: DaemonLoop is constructed AFTER _setSessionManagerForTest so the
249+
* class field `private sessions = getSessionManager()` picks up the real
250+
* SessionManager from the singleton. No internal stomping is used.
251+
*/
252+
253+
afterEach(() => {
254+
clearAllSessions();
255+
_setSessionManagerForTest(null);
256+
});
257+
258+
it("resumes a rate_limited session whose resumeAfter was written to disk before daemon restart", async () => {
259+
// Step 1: write a rate_limited session to disk (simulating post-crash state)
260+
const sessionId = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
261+
const onDiskSession: SessionFile = {
262+
type: "worker",
263+
agentId: "agent-restart-test",
264+
sessionId,
265+
runtime: "claude" as any,
266+
startedAt: Date.now() - 120_000,
267+
apiUrl: "http://localhost",
268+
privateKeyJwk: {} as any,
269+
taskId: "task-restart-test",
270+
status: "rate_limited",
271+
resumeAfter: Date.now() - 5_000, // expired 5 seconds ago
272+
};
273+
writeSession(onDiskSession);
274+
275+
// Step 2: seed the singleton with a fresh SessionManager, then construct
276+
// DaemonLoop so its class field `sessions = getSessionManager()` binds to it.
277+
_setSessionManagerForTest(new SessionManager());
278+
279+
const pool = {
280+
activeCount: 0,
281+
hasTask: (_id: string) => false,
282+
getActiveTaskIds: () => [],
283+
} as any;
284+
285+
const freshLoop = new DaemonLoop(makeClient(), pool, makeRateLimiter(), makePrMonitor(), {
286+
maxConcurrent: 4,
287+
pollInterval: 1000,
288+
});
289+
(freshLoop as any).running = true;
290+
291+
// Step 3: call resumeBackoffSessions — must find the on-disk session
292+
await (freshLoop as any).resumeBackoffSessions();
293+
294+
// Assert: resumeOneSession was called for the on-disk session
295+
expect(resumeOneSessionMock).toHaveBeenCalledTimes(1);
296+
const sessionArg: SessionFile = resumeOneSessionMock.mock.calls[0][0];
297+
expect(sessionArg.sessionId).toBe(sessionId);
298+
expect(sessionArg.taskId).toBe("task-restart-test");
299+
expect(sessionArg.status).toBe("rate_limited");
300+
});
301+
302+
it("does NOT resume a rate_limited session whose resumeAfter is still in the future after restart", async () => {
303+
// Write a session whose backoff has NOT expired yet
304+
const sessionId = "ffffffff-aaaa-bbbb-cccc-dddddddddddd";
305+
const onDiskSession: SessionFile = {
306+
type: "worker",
307+
agentId: "agent-future-test",
308+
sessionId,
309+
runtime: "claude" as any,
310+
startedAt: Date.now() - 60_000,
311+
apiUrl: "http://localhost",
312+
privateKeyJwk: {} as any,
313+
taskId: "task-future-test",
314+
status: "rate_limited",
315+
resumeAfter: Date.now() + 600_000, // still 10 minutes away
316+
};
317+
writeSession(onDiskSession);
318+
319+
// Seed singleton before constructing DaemonLoop so the class field binds correctly.
320+
_setSessionManagerForTest(new SessionManager());
321+
322+
const pool = {
323+
activeCount: 0,
324+
hasTask: (_id: string) => false,
325+
getActiveTaskIds: () => [],
326+
} as any;
327+
328+
const freshLoop = new DaemonLoop(makeClient(), pool, makeRateLimiter(), makePrMonitor(), {
329+
maxConcurrent: 4,
330+
pollInterval: 1000,
331+
});
332+
(freshLoop as any).running = true;
333+
334+
await (freshLoop as any).resumeBackoffSessions();
335+
336+
expect(resumeOneSessionMock).not.toHaveBeenCalled();
337+
});
338+
});
339+
233340
describe("both callers pass the same non-empty prompt string", () => {
234341
it("resumeRateLimitedSessions and resumeBackoffSessions pass identical message text", async () => {
235342
// Call each function in isolation so call indices are unambiguous.

tests/runtimePool-coverage.test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,74 @@ describe("routeRateLimit — rejected status", () => {
700700
});
701701
});
702702

703+
describe("routeRateLimit — persists resumeAfter to session file", () => {
704+
it("persists resumeAfter equal to new Date(pauseUntil).getTime() after a rejected rate_limit event", async () => {
705+
const taskId = randomUUID();
706+
const sessionId = randomUUID();
707+
await seedActiveSession(sessions, sessionId, taskId);
708+
709+
const agentClient = makeAgentClient(null);
710+
const rateLimitSink = { onRateLimited: vi.fn(), onRateLimitResumed: vi.fn() };
711+
const resetAt = new Date(Date.now() + 60_000).toISOString();
712+
const expectedResumeAfter = new Date(resetAt).getTime();
713+
const events: AgentEvent[] = [makeRateLimitRejectedEvent(resetAt)];
714+
715+
await spawnAndWait(apiClient, { events, taskId, sessionId, agentClient, rateLimitSink });
716+
717+
const session = sessions.read(sessionId);
718+
expect(session).not.toBeNull();
719+
expect(session!.resumeAfter).toBe(expectedResumeAfter);
720+
});
721+
722+
it("persists resumeAfter equal to the later overage resetAt when overage resets later than main", async () => {
723+
const taskId = randomUUID();
724+
const sessionId = randomUUID();
725+
await seedActiveSession(sessions, sessionId, taskId);
726+
727+
const agentClient = makeAgentClient(null);
728+
const rateLimitSink = { onRateLimited: vi.fn(), onRateLimitResumed: vi.fn() };
729+
730+
// Overage resets later — pauseUntil should be the later time
731+
const earlier = new Date(Date.now() + 30_000).toISOString();
732+
const later = new Date(Date.now() + 120_000).toISOString();
733+
const expectedResumeAfter = new Date(later).getTime();
734+
const events: AgentEvent[] = [makeRateLimitRejectedWithOverageEvent(earlier, later)];
735+
736+
await spawnAndWait(apiClient, { events, taskId, sessionId, agentClient, rateLimitSink });
737+
738+
const session = sessions.read(sessionId);
739+
expect(session).not.toBeNull();
740+
expect(session!.resumeAfter).toBe(expectedResumeAfter);
741+
});
742+
743+
it("persists resumeAfter approximately 1h from now when resetAt is missing and no overage", async () => {
744+
const taskId = randomUUID();
745+
const sessionId = randomUUID();
746+
await seedActiveSession(sessions, sessionId, taskId);
747+
748+
const agentClient = makeAgentClient(null);
749+
const rateLimitSink = { onRateLimited: vi.fn(), onRateLimitResumed: vi.fn() };
750+
751+
// No resetAt and no overage → resetTimestamps.length === 0 → fallback: Date.now() + 3_600_000
752+
const before = Date.now();
753+
const event: AgentEvent = {
754+
type: "turn.rate_limit",
755+
status: "rejected",
756+
resetAt: undefined as any,
757+
isUsingOverage: false,
758+
} as AgentEvent;
759+
760+
await spawnAndWait(apiClient, { events: [event], taskId, sessionId, agentClient, rateLimitSink });
761+
762+
const session = sessions.read(sessionId);
763+
expect(session).not.toBeNull();
764+
const delta = session!.resumeAfter! - before;
765+
// Should be approximately 1 hour (3_600_000ms), within ±5s tolerance
766+
expect(delta).toBeGreaterThanOrEqual(3_600_000 - 5_000);
767+
expect(delta).toBeLessThanOrEqual(3_600_000 + 5_000);
768+
});
769+
});
770+
703771
describe("routeRateLimit — allowed status", () => {
704772
it("clears rateLimited flag and calls onRateLimitResumed when allowed without overage", async () => {
705773
const taskId = randomUUID();

0 commit comments

Comments
 (0)