Skip to content

Commit 11defd7

Browse files
committed
feat(runtime): track runtime availability
1 parent b2c6280 commit 11defd7

37 files changed

Lines changed: 937 additions & 105 deletions

apps/web/server/agentRepo.ts

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import type { Agent, AgentWithActivity, CreateAgentInput } from "@agent-kanban/shared";
2-
import { type AgentRuntime, BUILTIN_TEMPLATES } from "@agent-kanban/shared";
2+
import { type AgentRuntime, BUILTIN_TEMPLATES, MACHINE_STALE_TIMEOUT_MS } from "@agent-kanban/shared";
33
import { type D1, parseJsonFields } from "./db";
44
import { addSubkey, getOrCreateRootKey } from "./gpgKeyRepo";
5+
import { runtimeReadyPredicateSql } from "./machineRepo";
56

67
const parseAgent = <T extends Agent>(row: T) => parseJsonFields(row, ["skills", "handoff_to"]);
78

@@ -119,13 +120,23 @@ export async function seedBuiltinAgents(db: D1, ownerId: string): Promise<void>
119120
}
120121

121122
export async function listAgents(db: D1, ownerId: string): Promise<AgentWithActivity[]> {
123+
const runtimeCutoff = new Date(Date.now() - MACHINE_STALE_TIMEOUT_MS).toISOString();
122124
const result = await db
123125
.prepare(`
124126
SELECT a.id, a.owner_id, a.name, a.username, a.gpg_subkey_id, a.bio, a.soul, a.role, a.kind, a.handoff_to, a.runtime, a.model, a.skills,
125127
a.public_key, a.fingerprint, a.builtin, a.created_at, a.updated_at,
128+
CASE WHEN EXISTS (
129+
SELECT 1 FROM machines m, json_each(m.runtimes) rt
130+
WHERE m.owner_id = a.owner_id
131+
AND m.status = 'online'
132+
AND m.last_heartbeat_at >= ?
133+
AND ${runtimeReadyPredicateSql("a.runtime")}
134+
) THEN 1 ELSE 0 END as runtime_available,
126135
CASE WHEN EXISTS (SELECT 1 FROM agent_sessions s WHERE s.agent_id = a.id AND s.status = 'active') THEN 'online' ELSE 'offline' END as status,
127136
(SELECT MAX(tl.created_at) FROM task_actions tl WHERE tl.actor_id = a.id) as last_active_at,
128137
(SELECT COUNT(*) FROM tasks t WHERE t.assigned_to = a.id) as task_count,
138+
(SELECT COUNT(*) FROM tasks t WHERE t.assigned_to = a.id AND t.status = 'todo') as queued_task_count,
139+
(SELECT COUNT(*) FROM tasks t WHERE t.assigned_to = a.id AND t.status IN ('in_progress', 'in_review')) as active_task_count,
129140
COALESCE((SELECT SUM(s.input_tokens) FROM agent_sessions s WHERE s.agent_id = a.id), 0) as input_tokens,
130141
COALESCE((SELECT SUM(s.output_tokens) FROM agent_sessions s WHERE s.agent_id = a.id), 0) as output_tokens,
131142
COALESCE((SELECT SUM(s.cache_read_tokens) FROM agent_sessions s WHERE s.agent_id = a.id), 0) as cache_read_tokens,
@@ -135,19 +146,29 @@ export async function listAgents(db: D1, ownerId: string): Promise<AgentWithActi
135146
WHERE a.owner_id = ?
136147
ORDER BY a.created_at DESC
137148
`)
138-
.bind(ownerId)
149+
.bind(runtimeCutoff, ownerId)
139150
.all<AgentWithActivity>();
140-
return result.results.map((r) => ({ ...parseAgent(r), email: `${r.username}@mails.agent-kanban.dev` }));
151+
return result.results.map((r) => ({ ...parseAgent(r), runtime_available: !!r.runtime_available, email: `${r.username}@mails.agent-kanban.dev` }));
141152
}
142153

143154
export async function getAgent(db: D1, agentId: string, ownerId: string): Promise<AgentWithActivity | null> {
155+
const runtimeCutoff = new Date(Date.now() - MACHINE_STALE_TIMEOUT_MS).toISOString();
144156
return db
145157
.prepare(`
146158
SELECT a.id, a.owner_id, a.name, a.username, a.gpg_subkey_id, a.bio, a.soul, a.role, a.kind, a.handoff_to, a.runtime, a.model, a.skills,
147159
a.public_key, a.fingerprint, a.builtin, a.created_at, a.updated_at,
160+
CASE WHEN EXISTS (
161+
SELECT 1 FROM machines m, json_each(m.runtimes) rt
162+
WHERE m.owner_id = a.owner_id
163+
AND m.status = 'online'
164+
AND m.last_heartbeat_at >= ?
165+
AND ${runtimeReadyPredicateSql("a.runtime")}
166+
) THEN 1 ELSE 0 END as runtime_available,
148167
CASE WHEN EXISTS (SELECT 1 FROM agent_sessions s WHERE s.agent_id = a.id AND s.status = 'active') THEN 'online' ELSE 'offline' END as status,
149168
(SELECT MAX(tl.created_at) FROM task_actions tl WHERE tl.actor_id = a.id) as last_active_at,
150169
(SELECT COUNT(*) FROM tasks t WHERE t.assigned_to = a.id) as task_count,
170+
(SELECT COUNT(*) FROM tasks t WHERE t.assigned_to = a.id AND t.status = 'todo') as queued_task_count,
171+
(SELECT COUNT(*) FROM tasks t WHERE t.assigned_to = a.id AND t.status IN ('in_progress', 'in_review')) as active_task_count,
151172
COALESCE((SELECT SUM(s.input_tokens) FROM agent_sessions s WHERE s.agent_id = a.id), 0) as input_tokens,
152173
COALESCE((SELECT SUM(s.output_tokens) FROM agent_sessions s WHERE s.agent_id = a.id), 0) as output_tokens,
153174
COALESCE((SELECT SUM(s.cache_read_tokens) FROM agent_sessions s WHERE s.agent_id = a.id), 0) as cache_read_tokens,
@@ -156,9 +177,9 @@ export async function getAgent(db: D1, agentId: string, ownerId: string): Promis
156177
FROM agents a
157178
WHERE a.id = ? AND a.owner_id = ?
158179
`)
159-
.bind(agentId, ownerId)
180+
.bind(runtimeCutoff, agentId, ownerId)
160181
.first<AgentWithActivity>()
161-
.then((r) => (r ? { ...parseAgent(r), email: `${r.username}@mails.agent-kanban.dev` } : null));
182+
.then((r) => (r ? { ...parseAgent(r), runtime_available: !!r.runtime_available, email: `${r.username}@mails.agent-kanban.dev` } : null));
162183
}
163184

164185
export async function updateAgent(

apps/web/server/machineRepo.ts

Lines changed: 100 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
import type { Machine, MachineWithAgents, UsageInfo } from "@agent-kanban/shared";
2-
import { MACHINE_STALE_TIMEOUT_MS } from "@agent-kanban/shared";
1+
import type { AgentRuntime, Machine, MachineRuntime, MachineRuntimeStatus, MachineWithAgents, UsageInfo } from "@agent-kanban/shared";
2+
import { AGENT_RUNTIMES, MACHINE_STALE_TIMEOUT_MS, normalizeRuntime, RUNTIME_LABELS } from "@agent-kanban/shared";
33
import { type D1, newId, parseJsonFields } from "./db";
44

55
export interface CreateMachineInfo {
66
name: string;
77
os: string;
88
version: string;
9-
runtimes: string[];
9+
runtimes: MachineRuntime[];
1010
device_id: string;
1111
}
1212

1313
export interface HeartbeatInfo {
1414
version?: string;
15-
runtimes?: string[];
15+
runtimes?: MachineRuntime[];
1616
usage_info?: UsageInfo | null;
1717
}
1818

@@ -24,7 +24,7 @@ export async function upsertMachine(db: D1, ownerId: string, info: CreateMachine
2424
.prepare(`INSERT INTO machines (id, owner_id, device_id, name, os, version, runtimes, status, created_at)
2525
VALUES (?, ?, ?, ?, ?, ?, ?, 'offline', ?)
2626
ON CONFLICT(owner_id, device_id) DO UPDATE SET name = excluded.name, os = excluded.os, version = excluded.version, runtimes = excluded.runtimes`)
27-
.bind(id, ownerId, info.device_id, info.name, info.os, info.version, JSON.stringify(info.runtimes), now)
27+
.bind(id, ownerId, info.device_id, info.name, info.os, info.version, JSON.stringify(normalizeMachineRuntimes(info.runtimes, now)), now)
2828
.run();
2929
const row = await db.prepare("SELECT * FROM machines WHERE owner_id = ? AND device_id = ?").bind(ownerId, info.device_id).first<Machine>();
3030
return parseMachine(row!);
@@ -46,7 +46,7 @@ export async function updateMachine(db: D1, machineId: string, ownerId: string,
4646
}
4747
if (info.runtimes) {
4848
sets.push("runtimes = ?");
49-
binds.push(JSON.stringify(info.runtimes));
49+
binds.push(JSON.stringify(normalizeMachineRuntimes(info.runtimes, now)));
5050
}
5151
if (info.usage_info) {
5252
sets.push("usage_info = ?");
@@ -129,7 +129,100 @@ export async function listAllMachines(db: D1): Promise<AdminMachine[]> {
129129
return result.results.map(parseMachine);
130130
}
131131

132-
const parseMachine = <T extends Machine>(row: T) => parseJsonFields(row, ["runtimes", "usage_info"]);
132+
function parseMachine<T extends Machine>(row: T): T {
133+
const parsed = parseJsonFields(row, ["runtimes", "usage_info"]);
134+
parsed.runtimes = normalizeMachineRuntimes(parsed.runtimes ?? [], parsed.last_heartbeat_at ?? parsed.created_at);
135+
return parsed;
136+
}
137+
138+
const RUNTIME_BY_LABEL = Object.fromEntries(Object.entries(RUNTIME_LABELS).map(([runtime, label]) => [label, runtime])) as Record<
139+
string,
140+
AgentRuntime
141+
>;
142+
143+
export function runtimeMatchValues(runtime: string): string[] {
144+
const normalized = normalizeRuntime(runtime);
145+
const canonical = (RUNTIME_BY_LABEL[normalized] ?? normalized) as AgentRuntime;
146+
const label = RUNTIME_LABELS[canonical];
147+
return label && label !== canonical ? [canonical, label] : [canonical];
148+
}
149+
150+
export function runtimeReadyPredicateSql(runtimeExpr: string): string {
151+
return `
152+
(
153+
(
154+
rt.type = 'text'
155+
AND (rt.value = ${runtimeExpr} OR rt.value = ${runtimeLabelCaseSql(runtimeExpr)})
156+
)
157+
OR (
158+
json_extract(rt.value, '$.status') = 'ready'
159+
AND json_extract(rt.value, '$.name') = ${runtimeExpr}
160+
)
161+
)
162+
`;
163+
}
164+
165+
function runtimeLabelCaseSql(runtimeExpr: string): string {
166+
const cases = Object.entries(RUNTIME_LABELS)
167+
.map(([runtime, label]) => `WHEN '${runtime}' THEN '${label.replace(/'/g, "''")}'`)
168+
.join(" ");
169+
return `CASE ${runtimeExpr} ${cases} END`;
170+
}
171+
172+
const RUNTIME_STATUSES: readonly MachineRuntimeStatus[] = ["missing", "unauthorized", "unhealthy", "limited", "ready"];
173+
174+
export function normalizeMachineRuntimes(runtimes: MachineRuntime[] | string[], checkedAt: string): MachineRuntime[] {
175+
return runtimes.map((runtime) => {
176+
if (typeof runtime === "string") {
177+
return { name: normalizeMachineRuntimeName(runtime), status: "ready", checked_at: checkedAt };
178+
}
179+
const name = normalizeMachineRuntimeName(runtime.name);
180+
if (!RUNTIME_STATUSES.includes(runtime.status)) {
181+
throw new Error(`Invalid runtime status "${runtime.status}"`);
182+
}
183+
return {
184+
name,
185+
status: runtime.status,
186+
...(runtime.detail ? { detail: runtime.detail } : {}),
187+
...(runtime.reset_at ? { reset_at: runtime.reset_at } : {}),
188+
checked_at: runtime.checked_at || checkedAt,
189+
};
190+
});
191+
}
192+
193+
function normalizeMachineRuntimeName(runtime: string): AgentRuntime {
194+
const normalized = normalizeRuntime(runtime);
195+
const canonical = RUNTIME_BY_LABEL[normalized] ?? normalized;
196+
if (!AGENT_RUNTIMES.includes(canonical as AgentRuntime)) {
197+
throw new Error(`Invalid runtime "${runtime}"`);
198+
}
199+
return canonical as AgentRuntime;
200+
}
201+
202+
export async function isRuntimeAvailable(db: D1, ownerId: string, runtime: string): Promise<boolean> {
203+
const cutoff = new Date(Date.now() - MACHINE_STALE_TIMEOUT_MS).toISOString();
204+
const values = runtimeMatchValues(runtime);
205+
const placeholders = values.map(() => "?").join(", ");
206+
const row = await db
207+
.prepare(`
208+
SELECT 1
209+
FROM machines m, json_each(m.runtimes) rt
210+
WHERE m.owner_id = ?
211+
AND m.status = 'online'
212+
AND m.last_heartbeat_at >= ?
213+
AND (
214+
(rt.type = 'text' AND rt.value IN (${placeholders}))
215+
OR (
216+
json_extract(rt.value, '$.status') = 'ready'
217+
AND json_extract(rt.value, '$.name') IN (${placeholders})
218+
)
219+
)
220+
LIMIT 1
221+
`)
222+
.bind(ownerId, cutoff, ...values, ...values)
223+
.first();
224+
return !!row;
225+
}
133226

134227
export async function detectStaleMachines(db: D1): Promise<void> {
135228
const cutoff = new Date(Date.now() - MACHINE_STALE_TIMEOUT_MS).toISOString();

apps/web/server/routes.ts

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
findInvalidSkillRef,
55
isBoardType,
66
isValidUsername,
7+
type MachineRuntime,
78
parseScheduledAt,
89
RESERVED_ROLES,
910
} from "@agent-kanban/shared";
@@ -29,7 +30,7 @@ import { cliVersionMiddleware } from "./cliVersion";
2930
import { addAgentEmail, getGithubToken, removeAgentEmail, syncGpgKey } from "./githubService";
3031
import { getArmoredPrivateKey, getRootKeyInfo, getRootPublicKey, getSubkeyIds } from "./gpgKeyRepo";
3132
import { createLogger } from "./logger";
32-
import { deleteMachine, getMachine, listAllMachines, listMachines, updateMachine, upsertMachine } from "./machineRepo";
33+
import { deleteMachine, getMachine, listAllMachines, listMachines, normalizeMachineRuntimes, updateMachine, upsertMachine } from "./machineRepo";
3334
import { createMailbox, deleteMailbox, getEmail, getInbox } from "./mailsService";
3435
import { createMessage, listMessages } from "./messageRepo";
3536
import { metricsMiddleware } from "./metrics";
@@ -70,6 +71,17 @@ function assertValidSkillRefs(skills: unknown) {
7071
}
7172
}
7273

74+
function assertValidMachineRuntimes(runtimes: unknown): void {
75+
if (!Array.isArray(runtimes)) {
76+
throw new HTTPException(400, { message: "runtimes must be an array" });
77+
}
78+
try {
79+
normalizeMachineRuntimes(runtimes as MachineRuntime[], new Date().toISOString());
80+
} catch (err) {
81+
throw new HTTPException(400, { message: err instanceof Error ? err.message : "Invalid runtimes" });
82+
}
83+
}
84+
7385
function resolveActor(c: { get: (key: string) => any }): { actorType: string; actorId: string; sessionId: string | null } {
7486
const identity: string = c.get("identityType") || "machine";
7587
let actorId: string;
@@ -313,16 +325,18 @@ api.use("/api/*", metricsMiddleware);
313325
// ─── Machines ───
314326

315327
api.post("/api/machines/:id/heartbeat", async (c) => {
316-
const body = await c.req.json<{ version?: string; runtimes?: string[]; usage_info?: any }>();
328+
const body = await c.req.json<{ version?: string; runtimes?: MachineRuntime[]; usage_info?: any }>();
329+
if (body.runtimes !== undefined) assertValidMachineRuntimes(body.runtimes);
317330
const machineId = c.req.param("id");
318-
const updated = await updateMachine(c.env.DB, machineId, c.get("ownerId"), body);
319-
if (!updated) throw new HTTPException(404, { message: "Machine not found" });
320-
321-
// Bind API key to this machine if unbound; reject if bound to a different machine
322331
const boundMachineId = c.get("machineId");
323332
if (boundMachineId && boundMachineId !== machineId) {
324333
throw new HTTPException(403, { message: "API key is bound to a different machine" });
325334
}
335+
336+
const updated = await updateMachine(c.env.DB, machineId, c.get("ownerId"), body);
337+
if (!updated) throw new HTTPException(404, { message: "Machine not found" });
338+
339+
// Bind API key to this machine if unbound.
326340
if (!boundMachineId) {
327341
const auth = createAuth(c.env);
328342
const authCtx = await auth.$context;
@@ -348,10 +362,11 @@ api.get("/api/machines/:id", async (c) => {
348362
});
349363

350364
api.post("/api/machines", async (c) => {
351-
const body = await c.req.json<{ name: string; os: string; version: string; runtimes: string[]; device_id: string }>();
365+
const body = await c.req.json<{ name: string; os: string; version: string; runtimes: MachineRuntime[]; device_id: string }>();
352366
if (!body.name || !body.os || !body.version || !body.runtimes || !body.device_id) {
353367
throw new HTTPException(400, { message: "name, os, version, runtimes, and device_id are required" });
354368
}
369+
assertValidMachineRuntimes(body.runtimes);
355370
const machine = await upsertMachine(c.env.DB, c.get("ownerId"), body);
356371

357372
// Registration always binds the API key to the upserted machine

apps/web/server/taskRepo.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { validateTransition } from "@agent-kanban/shared";
33
import { HTTPException } from "hono/http-exception";
44
import { getDefaultBoard } from "./boardRepo";
55
import { type D1, MAX_TASK_PARTITION_ROWS, newLongId, parseJsonFields } from "./db";
6+
import { isRuntimeAvailable } from "./machineRepo";
67
import { computeBlocked, detectCycle, getDependencies, setDependencies } from "./taskDeps";
78

89
const parseTask = <T extends Task>(row: T) => parseJsonFields(row, ["labels", "input"]);
@@ -15,10 +16,18 @@ function enforceTransition(action: TaskActionType, currentStatus: TaskStatus, id
1516
}
1617
}
1718

18-
async function assertAssignableWorkerAgent(db: D1, agentId: string, missingStatus: 400 | 404): Promise<void> {
19-
const agent = await db.prepare("SELECT kind FROM agents WHERE id = ?").bind(agentId).first<{ kind: string }>();
19+
async function assertAssignableWorkerAgent(db: D1, ownerId: string, agentId: string, missingStatus: 400 | 404): Promise<void> {
20+
const agent = await db
21+
.prepare("SELECT kind, runtime FROM agents WHERE id = ? AND owner_id = ?")
22+
.bind(agentId, ownerId)
23+
.first<{ kind: string; runtime: string }>();
2024
if (!agent) throw new HTTPException(missingStatus, { message: "Agent not found" });
2125
if (agent.kind === "leader") throw new HTTPException(400, { message: "Cannot assign tasks to leader agents" });
26+
if (!(await isRuntimeAvailable(db, ownerId, agent.runtime))) {
27+
throw new HTTPException(409, {
28+
message: `Runtime "${agent.runtime}" is not available on any online machine. Choose or create a worker that uses an available runtime.`,
29+
});
30+
}
2231
}
2332

2433
export async function createTask(
@@ -64,7 +73,7 @@ export async function createTask(
6473
}
6574

6675
if (input.assigned_to) {
67-
await assertAssignableWorkerAgent(db, input.assigned_to, 400);
76+
await assertAssignableWorkerAgent(db, ownerId, input.assigned_to, 400);
6877
}
6978

7079
// Atomically allocate the next seq number via RETURNING
@@ -349,12 +358,16 @@ export async function assignTask(
349358
actorId: string,
350359
sessionId: string | null = null,
351360
): Promise<Task | null> {
352-
const task = await db.prepare("SELECT * FROM tasks WHERE id = ?").bind(taskId).first<Task>();
361+
const task = await db
362+
.prepare("SELECT t.*, b.owner_id as board_owner_id FROM tasks t JOIN boards b ON t.board_id = b.id WHERE t.id = ?")
363+
.bind(taskId)
364+
.first<Task & { board_owner_id: string }>();
353365
if (!task) return null;
354366
if (task.status !== "todo") throw new HTTPException(409, { message: "Can only assign tasks in todo status" });
355367
if (task.assigned_to) throw new HTTPException(409, { message: "Task is already assigned" });
356368

357-
await assertAssignableWorkerAgent(db, targetAgentId, 404);
369+
const { board_owner_id: ownerId, ...taskRow } = task;
370+
await assertAssignableWorkerAgent(db, ownerId, targetAgentId, 404);
358371

359372
const now = new Date().toISOString();
360373
const logId = newLongId();
@@ -368,7 +381,7 @@ export async function assignTask(
368381
.bind(logId, taskId, actorType, actorId, sessionId, now),
369382
]);
370383

371-
return parseTask({ ...task, assigned_to: targetAgentId, updated_at: now } as Task);
384+
return parseTask({ ...taskRow, assigned_to: targetAgentId, updated_at: now } as Task);
372385
}
373386

374387
export async function completeTask(

apps/web/src/components/AddMachineSteps.tsx

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { type MachineRuntime, RUNTIME_LABELS } from "@agent-kanban/shared";
12
import { useQueryClient } from "@tanstack/react-query";
23
import { useCallback, useEffect, useRef, useState } from "react";
34
import { api } from "../lib/api";
@@ -11,6 +12,10 @@ interface AddMachineStepsProps {
1112
onConnected?: (machine: any) => void;
1213
}
1314

15+
function runtimeLabel(runtime: MachineRuntime): string {
16+
return `${RUNTIME_LABELS[runtime.name] ?? runtime.name}:${runtime.status}`;
17+
}
18+
1419
export function AddMachineSteps({ apiKey, apiKeyId, onDone, onConnected }: AddMachineStepsProps) {
1520
const [connected, setConnected] = useState(false);
1621
const [connectedMachine, setConnectedMachine] = useState<any>(null);
@@ -75,9 +80,9 @@ export function AddMachineSteps({ apiKey, apiKeyId, onDone, onConnected }: AddMa
7580
<div className="flex items-center justify-between">
7681
<span className="text-[11px] text-content-tertiary uppercase tracking-wide">Runtimes</span>
7782
<div className="flex gap-1">
78-
{connectedMachine.runtimes.map((r: string) => (
79-
<span key={r} className="text-[10px] font-mono text-accent bg-accent-soft px-1.5 py-0.5 rounded">
80-
{r}
83+
{connectedMachine.runtimes.map((runtime: MachineRuntime) => (
84+
<span key={runtime.name} className="text-[10px] font-mono text-accent bg-accent-soft px-1.5 py-0.5 rounded">
85+
{runtimeLabel(runtime)}
8186
</span>
8287
))}
8388
</div>

0 commit comments

Comments
 (0)