Skip to content

Commit 8053ef2

Browse files
committed
feat(runtime): probe runtime availability before dispatch
1 parent 11defd7 commit 8053ef2

16 files changed

Lines changed: 283 additions & 42 deletions

File tree

packages/cli/src/daemon/dispatcher.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { type AgentInfo, generateSystemPrompt, writePromptFile } from "../agent/
1616
import { AgentClient, type ApiClient } from "../client/index.js";
1717
import { getCredentials } from "../config.js";
1818
import { createLogger } from "../logger.js";
19-
import { getProvider, normalizeRuntime } from "../providers/registry.js";
19+
import { getAvailableProviders, getProvider, normalizeRuntime } from "../providers/registry.js";
2020
import { getSessionManager } from "../session/manager.js";
2121
import type { SessionFile } from "../session/types.js";
2222
import { ensureCloned, prepareRepo, repoDir } from "../workspace/repoOps.js";
@@ -138,6 +138,7 @@ export async function dispatchTasks(
138138

139139
if (available.length === 0) return false;
140140

141+
const localRuntimes = new Set(getAvailableProviders().map((provider) => provider.name));
141142
const agentCache = new Map<string, { runtime: AgentRuntime | null; available: boolean }>();
142143
let task: any = null;
143144
for (const t of available) {
@@ -152,8 +153,8 @@ export async function dispatchTasks(
152153
agentState = { runtime: normalizeRuntime(agent.runtime ?? "claude"), available: agent.runtime_available !== false };
153154
agentCache.set(t.assigned_to, agentState);
154155
}
155-
if (!agentState.runtime || !agentState.available) continue;
156-
const localAvailability = getProvider(agentState.runtime).checkAvailability?.();
156+
if (!agentState.runtime || !agentState.available || !localRuntimes.has(agentState.runtime)) continue;
157+
const localAvailability = await getProvider(agentState.runtime).checkAvailability?.();
157158
if (localAvailability && localAvailability.status !== "ready") continue;
158159
if (!rateLimiter.isRuntimePaused(agentState.runtime)) {
159160
task = t;

packages/cli/src/daemon/index.ts

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,13 @@ export async function startDaemon(opts: DaemonOptions): Promise<void> {
5858
},
5959
});
6060

61-
const machineInfo = getMachineInfo(availableProviders, rateLimiter);
61+
const machineInfo = await getMachineInfo(availableProviders, rateLimiter);
6262
const deviceId = generateDeviceId();
6363
const machine = await client.registerMachine({ ...machineInfo, device_id: deviceId });
6464
const machineId = machine.id;
6565
logger.info(`Machine ready: ${machineId} (device: ${deviceId})`);
6666

67-
await client.heartbeat(machineId, { version: machineInfo.version, runtimes: buildRuntimeStates(availableProviders, rateLimiter) });
67+
await client.heartbeat(machineId, { version: machineInfo.version, runtimes: await buildRuntimeStates(availableProviders, rateLimiter) });
6868
migrateLegacySessions();
6969
await cleanupStaleSessions(client, machineId);
7070
await auditOrphanedTasks(client, machineId);
@@ -78,7 +78,7 @@ export async function startDaemon(opts: DaemonOptions): Promise<void> {
7878
sendHeartbeat = async () => {
7979
await client.heartbeat(machineId, {
8080
version: machineInfo.version,
81-
runtimes: buildRuntimeStates(availableProviders, rateLimiter),
81+
runtimes: await buildRuntimeStates(availableProviders, rateLimiter),
8282
usage_info: usageCollector.getSnapshot(),
8383
});
8484
};
@@ -97,9 +97,9 @@ export async function startDaemon(opts: DaemonOptions): Promise<void> {
9797
client,
9898
{ onSlotFreed: () => loop.onSlotFreed() },
9999
{
100-
onRateLimited: (runtime, resetAt) => {
100+
onRateLimited: async (runtime, resetAt) => {
101101
rateLimiter.pause(runtime, resetAt);
102-
sendHeartbeat?.().catch((e) => logger.warn(`Heartbeat failed after rate limit: ${(e as Error).message}`));
102+
await sendHeartbeat?.().catch((e) => logger.warn(`Heartbeat failed after rate limit: ${(e as Error).message}`));
103103
},
104104
onRateLimitResumed: (runtime) => rateLimiter.resumeRateLimit(runtime),
105105
},
@@ -165,22 +165,24 @@ function removePidFile(): void {
165165
}
166166
}
167167

168-
function getMachineInfo(providers: AgentProvider[], rateLimiter: RateLimiter) {
168+
async function getMachineInfo(providers: AgentProvider[], rateLimiter: RateLimiter) {
169169
const os = `${platform()} ${arch()} ${release()}`;
170-
const runtimes = buildRuntimeStates(providers, rateLimiter);
170+
const runtimes = await buildRuntimeStates(providers, rateLimiter);
171171
return { name: hostname(), os, version: getVersion(), runtimes };
172172
}
173173

174-
function buildRuntimeStates(providers: AgentProvider[], rateLimiter: RateLimiter): MachineRuntime[] {
174+
async function buildRuntimeStates(providers: AgentProvider[], rateLimiter: RateLimiter): Promise<MachineRuntime[]> {
175175
const checked_at = new Date().toISOString();
176-
return providers.map((provider) => {
177-
const reset_at = rateLimiter.pauseResetAt(provider.name);
178-
if (reset_at) {
179-
return { name: provider.name, status: "limited", detail: "runtime paused by rate limiter", reset_at, checked_at };
180-
}
181-
const availability = provider.checkAvailability?.() ?? { status: "ready" };
182-
return { name: provider.name, ...availability, checked_at };
183-
});
176+
return Promise.all(
177+
providers.map(async (provider) => {
178+
const reset_at = rateLimiter.pauseResetAt(provider.name);
179+
if (reset_at) {
180+
return { name: provider.name, status: "limited", detail: "runtime paused by rate limiter", reset_at, checked_at };
181+
}
182+
const availability = (await provider.checkAvailability?.()) ?? { status: "ready" };
183+
return { name: provider.name, ...availability, checked_at };
184+
}),
185+
);
184186
}
185187

186188
function formatRuntimeNames(runtimes: MachineRuntime[]): string {

packages/cli/src/daemon/runtimePool.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export interface AgentFlags {
6969
}
7070

7171
export interface RateLimitSink {
72-
onRateLimited: (runtime: string, resetAt: string) => void;
72+
onRateLimited: (runtime: string, resetAt: string) => void | Promise<void>;
7373
onRateLimitResumed: (runtime: string) => void;
7474
}
7575

@@ -285,7 +285,7 @@ async function routeRateLimit(agent: AgentFlags, event: Extract<AgentEvent, { ty
285285
await getSessionManager()
286286
.patch(agent.sessionId, { resumeAfter })
287287
.catch((e) => logger.warn(`Failed to persist resumeAfter for ${agent.sessionId.slice(0, 8)}: ${errMessage(e)}`));
288-
sink.onRateLimited(runtime, pauseUntil);
288+
await sink.onRateLimited(runtime, pauseUntil);
289289
return;
290290
}
291291
if (event.isUsingOverage) {

packages/cli/src/providers/claude.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import type { SDKAssistantMessage, SDKMessage, SDKPartialAssistantMessage, SDKUs
88
import { getSessionMessages, query } from "@anthropic-ai/claude-agent-sdk";
99
import { createLogger } from "../logger.js";
1010
import type { AgentEvent, AgentHandle, AgentProvider, ContentBlock, ExecuteOpts, HistoryEvent, UsageInfo, UsageWindow } from "./types.js";
11-
import { parseRetryAfterMs, UsageFetchError } from "./types.js";
11+
import { availabilityFromUsage, availabilityFromUsageError, parseRetryAfterMs, UsageFetchError } from "./types.js";
1212

1313
const SUBTASK_STATUSES: readonly SubtaskStatus[] = ["completed", "failed", "stopped"] as const;
1414

@@ -378,8 +378,13 @@ export const claudeProvider: AgentProvider = {
378378
name: "claude",
379379
label: "Claude Code",
380380

381-
checkAvailability() {
382-
return readOAuthToken() ? { status: "ready" } : { status: "unauthorized", detail: "Claude Code is not logged in" };
381+
async checkAvailability() {
382+
if (!readOAuthToken()) return { status: "unauthorized" as const, detail: "Claude Code is not logged in" };
383+
try {
384+
return availabilityFromUsage(await this.fetchUsage!());
385+
} catch (err) {
386+
return availabilityFromUsageError(err, "Claude Code");
387+
}
383388
},
384389

385390
execute(opts: ExecuteOpts): Promise<AgentHandle> {

packages/cli/src/providers/codex.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import type { BashArgs, ReadArgs } from "@agent-kanban/shared";
66
import { ToolName } from "@agent-kanban/shared";
77
import { Codex, type ThreadEvent } from "@openai/codex-sdk";
88
import type { AgentEvent, AgentHandle, AgentProvider, ContentBlock, ExecuteOpts, HistoryEvent, UsageInfo, UsageWindow } from "./types.js";
9-
import { parseRetryAfterMs, UsageFetchError } from "./types.js";
9+
import { availabilityFromUsage, availabilityFromUsageError, parseRetryAfterMs, UsageFetchError } from "./types.js";
1010

1111
const AUTH_PATH = join(homedir(), ".codex", "auth.json");
1212
const CODEX_SESSIONS_DIR = join(homedir(), ".codex", "sessions");
@@ -198,8 +198,15 @@ export const codexProvider: AgentProvider = {
198198
name: "codex",
199199
label: "Codex CLI",
200200

201-
checkAvailability() {
202-
return readAccessToken() || process.env.OPENAI_API_KEY ? { status: "ready" } : { status: "unauthorized", detail: "Codex is not logged in" };
201+
async checkAvailability() {
202+
const token = readAccessToken();
203+
if (!token && !process.env.OPENAI_API_KEY) return { status: "unauthorized" as const, detail: "Codex is not logged in" };
204+
if (!token) return { status: "ready" as const };
205+
try {
206+
return availabilityFromUsage(await this.fetchUsage!());
207+
} catch (err) {
208+
return availabilityFromUsageError(err, "Codex");
209+
}
203210
},
204211

205212
async execute(opts: ExecuteOpts): Promise<AgentHandle> {
@@ -273,15 +280,15 @@ export const codexProvider: AgentProvider = {
273280
windows.push({
274281
runtime: "codex",
275282
label: windowLabel(rl.primary_window.limit_window_seconds),
276-
utilization: rl.primary_window.used_percent,
283+
utilization: rl.primary_window.used_percent / 100,
277284
resets_at: new Date(rl.primary_window.reset_at * 1000).toISOString(),
278285
});
279286
}
280287
if (rl?.secondary_window) {
281288
windows.push({
282289
runtime: "codex",
283290
label: windowLabel(rl.secondary_window.limit_window_seconds),
284-
utilization: rl.secondary_window.used_percent,
291+
utilization: rl.secondary_window.used_percent / 100,
285292
resets_at: new Date(rl.secondary_window.reset_at * 1000).toISOString(),
286293
});
287294
}

packages/cli/src/providers/copilot.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import type { CopilotSession, SessionEvent } from "@github/copilot-sdk";
66
import { approveAll, CopilotClient } from "@github/copilot-sdk";
77
import { createLogger } from "../logger.js";
88
import type { AgentEvent, AgentHandle, AgentProvider, ContentBlock, ExecuteOpts, HistoryEvent, UsageInfo, UsageWindow } from "./types.js";
9-
import { parseRetryAfterMs, UsageFetchError } from "./types.js";
9+
import { availabilityFromUsage, availabilityFromUsageError, parseRetryAfterMs, UsageFetchError } from "./types.js";
1010

1111
const logger = createLogger("copilot");
1212

@@ -277,8 +277,13 @@ export const copilotProvider: AgentProvider = {
277277
name: "copilot",
278278
label: "GitHub Copilot",
279279

280-
checkAvailability() {
281-
return readGhToken() ? { status: "ready" } : { status: "unauthorized", detail: "GitHub CLI is not authenticated" };
280+
async checkAvailability() {
281+
if (!readGhToken()) return { status: "unauthorized" as const, detail: "GitHub CLI is not authenticated" };
282+
try {
283+
return availabilityFromUsage(await this.fetchUsage!());
284+
} catch (err) {
285+
return availabilityFromUsageError(err, "GitHub Copilot");
286+
}
282287
},
283288

284289
async execute(opts: ExecuteOpts): Promise<AgentHandle> {

packages/cli/src/providers/gemini.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ export const geminiProvider: AgentProvider = {
8484
name: "gemini",
8585
label: "Gemini CLI",
8686

87-
checkAvailability() {
87+
async checkAvailability() {
8888
return process.env.GEMINI_API_KEY || process.env.GOOGLE_API_KEY || existsSync(OAUTH_CREDS_PATH)
8989
? { status: "ready" }
9090
: { status: "unauthorized", detail: "Gemini CLI is not authenticated" };

packages/cli/src/providers/types.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@ import type { AgentEvent, AgentRuntime, ContentBlock, MachineRuntimeStatus, Usag
22

33
export type { AgentEvent, AgentRuntime, ContentBlock, UsageInfo, UsageWindow };
44

5+
export interface RuntimeAvailability {
6+
status: MachineRuntimeStatus;
7+
detail?: string;
8+
reset_at?: string;
9+
}
10+
511
/** Normalized history entry returned by provider history readers. */
612
export interface HistoryEvent {
713
id: string;
@@ -42,7 +48,7 @@ export interface AgentHandle {
4248
export interface AgentProvider {
4349
readonly name: AgentRuntime;
4450
readonly label: string;
45-
checkAvailability?(): { status: MachineRuntimeStatus; detail?: string };
51+
checkAvailability?(): Promise<RuntimeAvailability>;
4652
execute(opts: ExecuteOpts): Promise<AgentHandle>;
4753
/**
4854
* Retrieve session history from this provider's local storage.
@@ -82,6 +88,31 @@ export class UsageFetchError extends Error {
8288
}
8389
}
8490

91+
export function availabilityFromUsage(usage: UsageInfo | null): RuntimeAvailability {
92+
const exhausted = usage?.windows.filter((window) => window.utilization >= 1) ?? [];
93+
if (exhausted.length === 0) return { status: "ready" };
94+
95+
const reset_at = exhausted
96+
.map((window) => window.resets_at)
97+
.filter(Boolean)
98+
.sort()[0];
99+
return { status: "limited", detail: "runtime usage limit reached", reset_at };
100+
}
101+
102+
export function availabilityFromUsageError(err: unknown, runtimeLabel: string): RuntimeAvailability {
103+
if (!(err instanceof UsageFetchError)) {
104+
return { status: "unhealthy", detail: `${runtimeLabel} usage probe failed: ${(err as Error).message}` };
105+
}
106+
if (err.status === 401 || err.status === 403) {
107+
return { status: "unauthorized", detail: `${runtimeLabel} authentication failed` };
108+
}
109+
if (err.status === 429) {
110+
const reset_at = err.retryAfterMs === undefined ? undefined : new Date(Date.now() + err.retryAfterMs).toISOString();
111+
return { status: "limited", detail: `${runtimeLabel} usage limit reached`, reset_at };
112+
}
113+
return { status: "unhealthy", detail: err.message };
114+
}
115+
85116
/**
86117
* Parse an HTTP `Retry-After` header value. Supports both delta-seconds
87118
* (e.g. `"120"`) and HTTP-date (e.g. `"Fri, 11 Apr 2026 14:30:00 GMT"`).

packages/cli/tests/daemon-heartbeat.test.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
55
const mocks = vi.hoisted(() => ({
66
heartbeat: vi.fn(),
77
registerMachine: vi.fn(),
8-
capturedRateLimitSink: null as null | { onRateLimited: (runtime: string, resetAt: string) => void; onRateLimitResumed: (runtime: string) => void },
8+
capturedRateLimitSink: null as null | {
9+
onRateLimited: (runtime: string, resetAt: string) => void | Promise<void>;
10+
onRateLimitResumed: (runtime: string) => void;
11+
},
912
availability: null as null | Record<string, { status: "ready" | "unauthorized"; detail?: string }>,
1013
}));
1114

@@ -41,14 +44,14 @@ vi.mock("../src/providers/registry.js", () => ({
4144
{
4245
name: "claude",
4346
label: "Claude",
44-
checkAvailability: () => mocks.availability?.claude ?? { status: "ready" },
47+
checkAvailability: async () => mocks.availability?.claude ?? { status: "ready" },
4548
execute: vi.fn(),
4649
getHistory: vi.fn().mockResolvedValue([]),
4750
},
4851
{
4952
name: "codex",
5053
label: "Codex",
51-
checkAvailability: () => mocks.availability?.codex ?? { status: "ready" },
54+
checkAvailability: async () => mocks.availability?.codex ?? { status: "ready" },
5255
execute: vi.fn(),
5356
getHistory: vi.fn().mockResolvedValue([]),
5457
},
@@ -154,8 +157,7 @@ describe("daemon heartbeat runtime states", () => {
154157
});
155158

156159
const resetAt = "2026-03-21T10:30:00.000Z";
157-
mocks.capturedRateLimitSink!.onRateLimited("claude", resetAt);
158-
await Promise.resolve();
160+
await mocks.capturedRateLimitSink!.onRateLimited("claude", resetAt);
159161

160162
expect(mocks.heartbeat).toHaveBeenLastCalledWith("machine-1", {
161163
version: "1.2.3",
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// @vitest-environment node
2+
3+
import { describe, expect, it, vi } from "vitest";
4+
import { availabilityFromUsage, availabilityFromUsageError, UsageFetchError } from "../src/providers/types.js";
5+
6+
describe("provider availability helpers", () => {
7+
it("reports ready when usage is absent or below its limit", () => {
8+
expect(availabilityFromUsage(null)).toEqual({ status: "ready" });
9+
expect(
10+
availabilityFromUsage({
11+
updated_at: "2026-03-21T10:00:00.000Z",
12+
windows: [{ runtime: "claude", label: "5-Hour", utilization: 0.5, resets_at: "2026-03-21T12:00:00.000Z" }],
13+
}),
14+
).toEqual({ status: "ready" });
15+
});
16+
17+
it("reports limited when a usage window is exhausted", () => {
18+
expect(
19+
availabilityFromUsage({
20+
updated_at: "2026-03-21T10:00:00.000Z",
21+
windows: [{ runtime: "codex", label: "5-Hour", utilization: 1, resets_at: "2026-03-21T12:00:00.000Z" }],
22+
}),
23+
).toEqual({
24+
status: "limited",
25+
detail: "runtime usage limit reached",
26+
reset_at: "2026-03-21T12:00:00.000Z",
27+
});
28+
});
29+
30+
it("maps usage probe errors to runtime statuses", () => {
31+
vi.useFakeTimers();
32+
vi.setSystemTime(new Date("2026-03-21T10:00:00.000Z"));
33+
34+
expect(availabilityFromUsageError(new UsageFetchError("failed", { status: 401 }), "Codex")).toEqual({
35+
status: "unauthorized",
36+
detail: "Codex authentication failed",
37+
});
38+
expect(availabilityFromUsageError(new UsageFetchError("forbidden", { status: 403 }), "Codex")).toEqual({
39+
status: "unauthorized",
40+
detail: "Codex authentication failed",
41+
});
42+
expect(availabilityFromUsageError(new UsageFetchError("limited", { status: 429, retryAfterMs: 60_000 }), "Codex")).toEqual({
43+
status: "limited",
44+
detail: "Codex usage limit reached",
45+
reset_at: "2026-03-21T10:01:00.000Z",
46+
});
47+
expect(availabilityFromUsageError(new UsageFetchError("server failed", { status: 500 }), "Codex")).toEqual({
48+
status: "unhealthy",
49+
detail: "server failed",
50+
});
51+
expect(availabilityFromUsageError(new Error("network down"), "Codex")).toEqual({
52+
status: "unhealthy",
53+
detail: "Codex usage probe failed: network down",
54+
});
55+
56+
vi.useRealTimers();
57+
});
58+
});

0 commit comments

Comments
 (0)