Skip to content

Commit 700594d

Browse files
committed
fix(server): cap task_actions / messages fetches at 500 rows
Every SSE reconnect on a task was reading every row in task_actions and messages for that task, then slicing to 50 in JS. D1 billed for the full scan. With 100 concurrent SSE sessions, this is the dominant component of the 7M rows read/day we see on prod. Changes: - listMessages / getTaskActions: add limit param, default to the new MAX_TASK_PARTITION_ROWS constant (500). Without since returns the latest N in ASC order (DESC fetch + reverse). With since returns up to N after the cursor in ASC order. - sse.ts: initial fetch asks for 50 rows (no cursor) or 500 (with cursor). Catch-up polls request 500. When either feed returns exactly the cap an event gap SSE frame is emitted so the client can fall back to HTTP instead of silently missing older rows. - taskRepo.getTaskById now delegates to getTaskActions instead of duplicating the SQL. - Known limitation documented for created_at cursor collision.
1 parent a9b79ff commit 700594d

6 files changed

Lines changed: 423 additions & 37 deletions

File tree

apps/web/server/db.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ export function newLongId(): string {
1414

1515
export type D1 = D1Database;
1616

17+
// Hard ceiling on rows returned from a single task partition (actions or
18+
// messages). Protects D1 read budget against tasks with runaway row counts.
19+
// Any fetch that returns exactly this many rows is at the cap — callers
20+
// must assume older/newer rows beyond this point were silently truncated.
21+
export const MAX_TASK_PARTITION_ROWS = 500;
22+
1723
export function parseJsonFields<T>(row: T, fields: (keyof T)[]): T {
1824
for (const f of fields) {
1925
if (typeof row[f] === "string") row[f] = JSON.parse(row[f] as string);

apps/web/server/messageRepo.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Message, SenderType } from "@agent-kanban/shared";
2-
import { type D1, newLongId } from "./db";
2+
import { type D1, MAX_TASK_PARTITION_ROWS, newLongId } from "./db";
33

44
export async function createMessage(db: D1, taskId: string, senderType: SenderType, senderId: string, content: string): Promise<Message> {
55
const id = newLongId();
@@ -12,10 +12,23 @@ export async function createMessage(db: D1, taskId: string, senderType: SenderTy
1212
return { id, task_id: taskId, sender_type: senderType, sender_id: senderId, content, created_at: now };
1313
}
1414

15-
export async function listMessages(db: D1, taskId: string, since?: string): Promise<Message[]> {
16-
const query = since
17-
? db.prepare("SELECT * FROM messages WHERE task_id = ? AND created_at > ? ORDER BY created_at ASC").bind(taskId, since)
18-
: db.prepare("SELECT * FROM messages WHERE task_id = ? ORDER BY created_at ASC").bind(taskId);
19-
const result = await query.all<Message>();
20-
return result.results;
15+
// When `since` is provided, returns up to `limit` rows after the cursor in
16+
// ASC order (incremental catch-up). Without `since`, returns the most recent
17+
// `limit` rows — fetched DESC then reversed so callers always see ASC order.
18+
// A hard LIMIT protects against task_id partitions with runaway row counts.
19+
//
20+
// KNOWN LIMITATION: `since` uses `created_at > ?`, which skips rows sharing
21+
// the cursor's millisecond. `newLongId()` is random (not monotonic) so the id
22+
// can't serve as a tiebreaker today. Tracked for follow-up — fix requires
23+
// either a monotonic sequence column or cursor-pair semantics.
24+
export async function listMessages(db: D1, taskId: string, since?: string, limit: number = MAX_TASK_PARTITION_ROWS): Promise<Message[]> {
25+
if (since) {
26+
const result = await db
27+
.prepare("SELECT * FROM messages WHERE task_id = ? AND created_at > ? ORDER BY created_at ASC LIMIT ?")
28+
.bind(taskId, since, limit)
29+
.all<Message>();
30+
return result.results;
31+
}
32+
const result = await db.prepare("SELECT * FROM messages WHERE task_id = ? ORDER BY created_at DESC LIMIT ?").bind(taskId, limit).all<Message>();
33+
return result.results.reverse();
2134
}

apps/web/server/sse.ts

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { MAX_TASK_PARTITION_ROWS } from "./db";
12
import { listMessages } from "./messageRepo";
23
import { getTaskActions } from "./taskRepo";
34
import type { Env } from "./types";
@@ -57,16 +58,39 @@ export async function createSSEResponse(env: Env, taskId: string, lastEventId: s
5758
return writer.write(encoder.encode(msg));
5859
};
5960

61+
// Signal the client that their catch-up window hit the hard row cap and
62+
// older rows were silently truncated. Client should drop its cursor and
63+
// reload the task via HTTP. Unknown SSE event types are ignored by older
64+
// clients, so this is backward-compatible.
65+
const writeGap = (reason: string) => {
66+
const msg = `event: gap\ndata: ${JSON.stringify({ reason })}\n\n`;
67+
return writer.write(encoder.encode(msg));
68+
};
69+
6070
const run = async () => {
61-
const [initialNotes, initialMessages] = await Promise.all([getTaskActions(db, taskId, since), listMessages(db, taskId, since)]);
71+
// Without `since`, fetch the 50 most recent — repo layer already returns
72+
// them in ASC order. With `since`, cap catch-up at the partition ceiling
73+
// so reconnects after long offline periods can't detonate D1 reads.
74+
const initialLimit = since ? MAX_TASK_PARTITION_ROWS : 50;
75+
const [initialNotes, initialMessages] = await Promise.all([
76+
getTaskActions(db, taskId, since, initialLimit),
77+
listMessages(db, taskId, since, initialLimit),
78+
]);
79+
80+
// When catching up and either feed returned exactly the cap, older rows
81+
// were truncated. Emit a gap signal before the rows we do have so the
82+
// client can decide to reload via HTTP instead of silently missing data.
83+
if (since && (initialNotes.length === initialLimit || initialMessages.length === initialLimit)) {
84+
await writeGap("initial_truncated");
85+
}
6286

63-
const noteEvents: SSEEvent[] = (since ? initialNotes : initialNotes.slice(-50)).map((l) => ({
87+
const noteEvents: SSEEvent[] = initialNotes.map((l) => ({
6488
id: l.id,
6589
type: "note" as const,
6690
data: JSON.stringify(l),
6791
created_at: l.created_at,
6892
}));
69-
const msgEvents: SSEEvent[] = (since ? initialMessages : initialMessages.slice(-50)).map((m) => ({
93+
const msgEvents: SSEEvent[] = initialMessages.map((m) => ({
7094
id: m.id,
7195
type: "message" as const,
7296
data: JSON.stringify(m),
@@ -85,7 +109,17 @@ export async function createSSEResponse(env: Env, taskId: string, lastEventId: s
85109
while (Date.now() < deadline) {
86110
await new Promise((r) => setTimeout(r, 2000));
87111

88-
const [newNotes, newMessages] = await Promise.all([getTaskActions(db, taskId, lastSeen), listMessages(db, taskId, lastSeen)]);
112+
const [newNotes, newMessages] = await Promise.all([
113+
getTaskActions(db, taskId, lastSeen, MAX_TASK_PARTITION_ROWS),
114+
listMessages(db, taskId, lastSeen, MAX_TASK_PARTITION_ROWS),
115+
]);
116+
117+
// Same ceiling signal during live polling — a 2s window with >500 new
118+
// rows means the client's cursor is behind reality and the tail is at
119+
// risk of silent truncation on the next tick. Tell the client to reload.
120+
if (newNotes.length === MAX_TASK_PARTITION_ROWS || newMessages.length === MAX_TASK_PARTITION_ROWS) {
121+
await writeGap("poll_truncated");
122+
}
89123

90124
const newNoteEvents = newNotes.map((l) => ({
91125
id: l.id,

apps/web/server/taskRepo.ts

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import type { BoardAction, CreateTaskInput, IdentityType, Task, TaskAction, Task
22
import { validateTransition } from "@agent-kanban/shared";
33
import { HTTPException } from "hono/http-exception";
44
import { getDefaultBoard } from "./boardRepo";
5-
import { type D1, newLongId, parseJsonFields } from "./db";
5+
import { type D1, MAX_TASK_PARTITION_ROWS, newLongId, parseJsonFields } from "./db";
66
import { computeBlocked, detectCycle, getDependencies, setDependencies } from "./taskDeps";
77

88
const parseTask = <T extends Task>(row: T) => parseJsonFields(row, ["labels", "input"]);
@@ -236,21 +236,12 @@ export async function getTask(db: D1, taskId: string, ownerId: string): Promise<
236236
if (!task) return null;
237237
parseTask(task);
238238

239-
const [actions, deps, blockedSet] = await Promise.all([
240-
db
241-
.prepare(
242-
"SELECT n.*, ag.name as actor_name, ag.public_key as actor_public_key FROM task_actions n LEFT JOIN agents ag ON n.actor_type LIKE 'agent:%' AND n.actor_id = ag.id WHERE n.task_id = ? ORDER BY n.created_at ASC",
243-
)
244-
.bind(taskId)
245-
.all<TaskAction>(),
246-
getDependencies(db, taskId),
247-
computeBlocked(db, [taskId]),
248-
]);
239+
const [actions, deps, blockedSet] = await Promise.all([getTaskActions(db, taskId), getDependencies(db, taskId), computeBlocked(db, [taskId])]);
249240

250-
const duration = computeDuration(actions.results);
241+
const duration = computeDuration(actions);
251242
task.blocked = blockedSet.has(taskId);
252243

253-
return { ...task, notes: actions.results, duration_minutes: duration, depends_on: deps, subtask_count: task.subtask_count };
244+
return { ...task, notes: actions, duration_minutes: duration, depends_on: deps, subtask_count: task.subtask_count };
254245
}
255246

256247
export async function updateTask(
@@ -551,23 +542,25 @@ export async function addTaskAction(
551542
};
552543
}
553544

554-
export async function getTaskActions(db: D1, taskId: string, since?: string): Promise<TaskAction[]> {
555-
let query =
545+
// When `since` is provided, returns up to `limit` rows after the cursor in
546+
// ASC order (incremental catch-up). Without `since`, returns the most recent
547+
// `limit` rows — fetched DESC then reversed so callers always see ASC order.
548+
// A hard LIMIT protects against tasks with runaway action counts.
549+
//
550+
// KNOWN LIMITATION: `since` uses `n.created_at > ?`, which skips rows sharing
551+
// the cursor's millisecond. `newLongId()` is random (not monotonic) so the id
552+
// can't serve as a tiebreaker today. Tracked for follow-up — fix requires
553+
// either a monotonic sequence column or cursor-pair semantics.
554+
export async function getTaskActions(db: D1, taskId: string, since?: string, limit: number = MAX_TASK_PARTITION_ROWS): Promise<TaskAction[]> {
555+
const base =
556556
"SELECT n.*, ag.name as actor_name, ag.public_key as actor_public_key FROM task_actions n LEFT JOIN agents ag ON n.actor_type LIKE 'agent:%' AND n.actor_id = ag.id WHERE n.task_id = ?";
557-
const binds: unknown[] = [taskId];
558557

559558
if (since) {
560-
query += " AND n.created_at > ?";
561-
binds.push(since);
559+
const result = await db.prepare(`${base} AND n.created_at > ? ORDER BY n.created_at ASC LIMIT ?`).bind(taskId, since, limit).all<TaskAction>();
560+
return result.results;
562561
}
563-
564-
query += " ORDER BY n.created_at ASC";
565-
566-
const result = await db
567-
.prepare(query)
568-
.bind(...binds)
569-
.all<TaskAction>();
570-
return result.results;
562+
const result = await db.prepare(`${base} ORDER BY n.created_at DESC LIMIT ?`).bind(taskId, limit).all<TaskAction>();
563+
return result.results.reverse();
571564
}
572565

573566
export async function getBoardActionsByBoardId(db: D1, boardId: string, since: string): Promise<BoardAction[]> {

0 commit comments

Comments
 (0)