Skip to content

Commit cedd18a

Browse files
committed
fix: restore chat relay history
1 parent ff4d674 commit cedd18a

15 files changed

Lines changed: 238 additions & 88 deletions

apps/web/server/auth.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,12 @@ export async function authMiddleware(c: Context<{ Bindings: Env }>, next: Next)
101101
const header = c.req.header("Authorization");
102102
const queryToken = c.req.query("token");
103103
const token = header?.startsWith("Bearer ") ? header.slice(7) : queryToken;
104+
105+
const auth = createAuth(c.env);
104106
if (!token) {
105-
return c.json({ error: { code: "UNAUTHORIZED", message: "Missing token" } }, 401);
107+
return handleUserSession(c, auth, c.req.raw.headers, next, "Missing token");
106108
}
107109

108-
const auth = createAuth(c.env);
109110
const type = detectTokenType(token);
110111

111112
if (type === "apikey") {
@@ -135,7 +136,11 @@ export async function authMiddleware(c: Context<{ Bindings: Env }>, next: Next)
135136
}
136137

137138
const authHeaders = new Headers({ Authorization: `Bearer ${token}` });
138-
const session = await auth.api.getSession({ headers: authHeaders });
139+
return handleUserSession(c, auth, authHeaders, next, "Invalid or expired token");
140+
}
141+
142+
async function handleUserSession(c: Context<{ Bindings: Env }>, auth: any, headers: Headers, next: Next, errorMessage: string) {
143+
const session = await auth.api.getSession({ headers });
139144
if (session) {
140145
c.set("ownerId", session.user.id);
141146
c.set("identityType", "user");
@@ -144,7 +149,18 @@ export async function authMiddleware(c: Context<{ Bindings: Env }>, next: Next)
144149
return enforceRouteRule(c, next);
145150
}
146151

147-
return c.json({ error: { code: "UNAUTHORIZED", message: "Invalid or expired token" } }, 401);
152+
if (headers !== c.req.raw.headers) {
153+
const cookieSession = await auth.api.getSession({ headers: c.req.raw.headers });
154+
if (cookieSession) {
155+
c.set("ownerId", cookieSession.user.id);
156+
c.set("identityType", "user");
157+
c.set("user", cookieSession.user);
158+
c.set("session", cookieSession.session);
159+
return enforceRouteRule(c, next);
160+
}
161+
}
162+
163+
return c.json({ error: { code: "UNAUTHORIZED", message: errorMessage } }, 401);
148164
}
149165

150166
async function handleApiKey(c: Context<{ Bindings: Env }>, auth: any, token: string, next: Next) {

apps/web/server/tunnelRelay.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,22 @@ export class TunnelRelay implements DurableObject {
130130
return;
131131
}
132132

133-
// History response — send back to the requesting browser
133+
// History response — prefer sessionId routing because Durable Object
134+
// hibernation does not preserve in-memory pendingHistory entries.
135+
if (type === "session:history" && typeof msg.sessionId === "string") {
136+
const data = JSON.stringify(msg);
137+
for (const browser of this.getBrowserSockets(msg.sessionId)) {
138+
try {
139+
browser.send(data);
140+
} catch {
141+
/* browser gone */
142+
}
143+
}
144+
if (msg.requestId) this.pendingHistory.delete(msg.requestId as string);
145+
return;
146+
}
147+
148+
// Backward compatibility for older daemons that don't include sessionId.
134149
if (type === "session:history" && msg.requestId) {
135150
const browser = this.pendingHistory.get(msg.requestId as string);
136151
if (browser) {

apps/web/src/hooks/useBoardSSE.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { BoardAction } from "@agent-kanban/shared";
22
import { useEffect, useRef, useState } from "react";
3-
import { getAuthToken } from "../lib/auth-client";
3+
import { getAuthToken, refreshAuthToken } from "../lib/auth-client";
44

55
const MAX_EVENTS = 50;
66

@@ -13,11 +13,11 @@ export function useBoardSSE(boardId: string | undefined) {
1313
useEffect(() => {
1414
if (!boardId) return;
1515

16-
function connect() {
16+
async function connect() {
1717
esRef.current?.close();
1818

1919
// Read token on every reconnect so refreshed tokens are picked up
20-
const token = getAuthToken();
20+
const token = (await refreshAuthToken()) ?? getAuthToken();
2121
if (!token) return;
2222

2323
const url = `/api/boards/${boardId}/stream?token=${encodeURIComponent(token)}`;

apps/web/src/hooks/useSSE.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { useEffect, useRef, useState } from "react";
2-
import { getAuthToken } from "../lib/auth-client";
2+
import { getAuthToken, refreshAuthToken } from "../lib/auth-client";
33

44
interface UseSSEOptions {
55
taskId: string;
@@ -19,13 +19,13 @@ export function useSSE({ taskId, enabled = true }: UseSSEOptions) {
1919
useEffect(() => {
2020
if (!enabled) return;
2121

22-
const token = getAuthToken();
23-
if (!token) return;
24-
25-
function connect() {
22+
async function connect() {
2623
// Close previous connection if any
2724
esRef.current?.close();
2825

26+
const token = (await refreshAuthToken()) ?? getAuthToken();
27+
if (!token) return;
28+
2929
const url = `/api/tasks/${taskId}/stream?token=${encodeURIComponent(token!)}`;
3030
const es = new EventSource(url);
3131
esRef.current = es;

apps/web/src/hooks/useSessionRelay.ts

Lines changed: 76 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { AgentEvent } from "@agent-kanban/shared";
22
import { useCallback, useEffect, useRef, useState } from "react";
3-
import { getAuthToken } from "../lib/auth-client";
3+
import { getAuthToken, refreshAuthToken } from "../lib/auth-client";
44

55
export type { AgentEvent };
66

@@ -30,8 +30,6 @@ export function useSessionRelay({ sessionId, enabled = true }: UseSessionRelayOp
3030

3131
useEffect(() => {
3232
if (!enabled || !sessionId) return;
33-
const token = getAuthToken();
34-
if (!token) return;
3533

3634
let closed = false;
3735
historyLoaded.current = false;
@@ -51,77 +49,92 @@ export function useSessionRelay({ sessionId, enabled = true }: UseSessionRelayOp
5149
}, 5000);
5250
}
5351

54-
const wsUrl = `${location.origin.replace(/^http/, "ws")}/api/tunnel/ws?role=browser&sessionId=${sessionId}&token=${encodeURIComponent(token)}`;
55-
const ws = new WebSocket(wsUrl);
56-
wsRef.current = ws;
52+
let ws: WebSocket | null = null;
5753

58-
ws.onopen = () => {
54+
function connect(token: string) {
5955
if (closed) return;
60-
setWsConnected(true);
61-
requestHistory(ws);
62-
};
56+
const wsUrl = `${location.origin.replace(/^http/, "ws")}/api/tunnel/ws?role=browser&sessionId=${sessionId}&token=${encodeURIComponent(token)}`;
57+
ws = new WebSocket(wsUrl);
58+
wsRef.current = ws;
59+
60+
ws.onopen = () => {
61+
if (closed || !ws) return;
62+
setWsConnected(true);
63+
requestHistory(ws);
64+
};
65+
66+
ws.onmessage = (rawEvent) => {
67+
if (closed) return;
68+
let msg: Record<string, unknown>;
69+
try {
70+
msg = JSON.parse(rawEvent.data);
71+
} catch {
72+
return;
73+
}
6374

64-
ws.onmessage = (rawEvent) => {
65-
if (closed) return;
66-
let msg: Record<string, unknown>;
67-
try {
68-
msg = JSON.parse(rawEvent.data);
69-
} catch {
70-
return;
71-
}
72-
73-
switch (msg.type) {
74-
case "session:history": {
75-
historyLoaded.current = true;
76-
clearTimeout(historyRetryTimer.current);
77-
const history = msg.events as RelayEvent[] | undefined;
78-
if (Array.isArray(history)) {
79-
setEvents((prev) => {
80-
const liveEvents = prev.filter((e) => e.id.startsWith("live-"));
81-
return [...history, ...liveEvents];
82-
});
75+
switch (msg.type) {
76+
case "session:history": {
77+
historyLoaded.current = true;
78+
clearTimeout(historyRetryTimer.current);
79+
const history = msg.events as RelayEvent[] | undefined;
80+
if (Array.isArray(history)) {
81+
setEvents((prev) => {
82+
const liveEvents = prev.filter((e) => e.id.startsWith("live-"));
83+
return [...history, ...liveEvents];
84+
});
85+
}
86+
break;
8387
}
84-
break;
85-
}
86-
case "agent:event": {
87-
const id = `live-${++idCounter.current}`;
88-
setEvents((prev) => [...prev, { id, event: msg.event as AgentEvent, timestamp: new Date().toISOString() }]);
89-
break;
90-
}
91-
case "agent:status": {
92-
const status = msg.status as string;
93-
if (status === "working" || status === "done" || status === "rate_limited") {
94-
setAgentStatus(status);
88+
case "agent:event": {
89+
const id = `live-${++idCounter.current}`;
90+
setEvents((prev) => [...prev, { id, event: msg.event as AgentEvent, timestamp: new Date().toISOString() }]);
91+
break;
9592
}
96-
break;
97-
}
98-
case "daemon:connected":
99-
setDaemonConnected(true);
100-
setAgentStatus("idle");
101-
// Re-request history if it was never loaded (e.g. the initial
102-
// request was forwarded to a stale daemon socket that never replied).
103-
if (!historyLoaded.current && ws.readyState === WebSocket.OPEN) {
104-
historyRetries.current = 0;
105-
requestHistory(ws);
93+
case "agent:status": {
94+
const status = msg.status as string;
95+
if (status === "working" || status === "done" || status === "rate_limited") {
96+
setAgentStatus(status);
97+
}
98+
break;
10699
}
107-
break;
108-
case "daemon:disconnected":
109-
setDaemonConnected(false);
110-
setAgentStatus("idle");
111-
break;
112-
}
113-
};
100+
case "daemon:connected":
101+
setDaemonConnected(true);
102+
setAgentStatus("idle");
103+
// Re-request history if it was never loaded (e.g. the initial
104+
// request was forwarded to a stale daemon socket that never replied).
105+
if (!historyLoaded.current && ws?.readyState === WebSocket.OPEN) {
106+
historyRetries.current = 0;
107+
requestHistory(ws);
108+
}
109+
break;
110+
case "daemon:disconnected":
111+
setDaemonConnected(false);
112+
setAgentStatus("idle");
113+
break;
114+
}
115+
};
114116

115-
ws.onclose = () => {
116-
if (closed) return;
117-
setWsConnected(false);
118-
setDaemonConnected(false);
119-
};
117+
ws.onclose = () => {
118+
if (closed) return;
119+
setWsConnected(false);
120+
setDaemonConnected(false);
121+
};
122+
}
123+
124+
const token = getAuthToken();
125+
if (token) {
126+
connect(token);
127+
void refreshAuthToken();
128+
} else {
129+
void refreshAuthToken().then((freshToken) => {
130+
if (freshToken) connect(freshToken);
131+
});
132+
}
120133

121134
return () => {
122135
closed = true;
123136
clearTimeout(historyRetryTimer.current);
124-
ws.close();
137+
ws?.close();
125138
if (wsRef.current === ws) wsRef.current = null;
126139
};
127140
}, [sessionId, enabled]);

apps/web/src/lib/api.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
import type { AgentRuntime } from "@agent-kanban/shared";
2-
import { getAuthToken } from "./auth-client";
2+
import { getAuthToken, refreshAuthToken } from "./auth-client";
33

44
const API_BASE = "/api";
55

66
async function request<T>(method: string, path: string, body?: unknown): Promise<T> {
7-
const token = getAuthToken();
7+
const token = getAuthToken() ?? (await refreshAuthToken());
88
if (!token) throw new Error("NOT_AUTHENTICATED");
99

10-
const res = await fetch(`${API_BASE}${path}`, {
10+
let res = await fetch(`${API_BASE}${path}`, {
1111
method,
1212
headers: {
1313
"Content-Type": "application/json",
@@ -16,6 +16,20 @@ async function request<T>(method: string, path: string, body?: unknown): Promise
1616
body: body ? JSON.stringify(body) : undefined,
1717
});
1818

19+
if (res.status === 401) {
20+
const freshToken = await refreshAuthToken();
21+
if (freshToken) {
22+
res = await fetch(`${API_BASE}${path}`, {
23+
method,
24+
headers: {
25+
"Content-Type": "application/json",
26+
Authorization: `Bearer ${freshToken}`,
27+
},
28+
body: body ? JSON.stringify(body) : undefined,
29+
});
30+
}
31+
}
32+
1933
const data = (await res.json()) as any;
2034

2135
if (!res.ok) {

apps/web/src/lib/auth-client.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,23 @@ export function clearAuthToken() {
1717
localStorage.removeItem(TOKEN_KEY);
1818
}
1919

20+
export async function refreshAuthToken(): Promise<string | null> {
21+
const res = await fetch("/api/auth/get-session", { credentials: "include" });
22+
if (!res.ok) {
23+
clearAuthToken();
24+
return null;
25+
}
26+
27+
const data = (await res.json()) as { session?: { token?: string } } | null;
28+
const token = data?.session?.token ?? null;
29+
if (!token) {
30+
clearAuthToken();
31+
return null;
32+
}
33+
setAuthToken(token);
34+
return token;
35+
}
36+
2037
export const authClient = createAuthClient({
2138
plugins: [agentAuthClient(), apiKeyClient(), adminClient()],
2239
fetchOptions: {

packages/cli/src/daemon/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ export async function startDaemon(opts: DaemonOptions): Promise<void> {
109109

110110
tunnel.onHistoryRequest((sessionId, requestId) => {
111111
fetchSessionHistory(sessionId)
112-
.then((events) => tunnel.sendHistory(events, requestId))
112+
.then((events) => tunnel.sendHistory(events, requestId, sessionId))
113113
.catch((e) => logger.warn(`History fetch failed for ${sessionId.slice(0, 8)}: ${e instanceof Error ? e.message : e}`));
114114
});
115115

packages/cli/src/daemon/runtimePool.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export interface AgentProcess {
3030
resultReceived: boolean;
3131
/** Cumulative cost reported by the last SDK result event. */
3232
lastCostUsd: number;
33+
persistedResumeToken?: string;
3334
onCleanup?: () => void;
3435
}
3536

@@ -376,6 +377,7 @@ async function consumeEvents(agent: AgentProcess, ctx: RuntimeContext): Promise<
376377
try {
377378
for await (const event of agent.handle.events) {
378379
if (!ctx.isAlive(agent.taskId)) return { crashed: false };
380+
await persistResumeToken(agent);
379381
await routeEvent(flags, event, agent.agentClient, ctx.rateLimitSink, ctx.tunnel);
380382
}
381383
return { crashed: false };
@@ -390,6 +392,15 @@ async function consumeEvents(agent: AgentProcess, ctx: RuntimeContext): Promise<
390392
}
391393
}
392394

395+
async function persistResumeToken(agent: AgentProcess): Promise<void> {
396+
const token = agent.handle.getResumeToken?.();
397+
if (!token || token === agent.persistedResumeToken) return;
398+
agent.persistedResumeToken = token;
399+
await getSessionManager()
400+
.patch(agent.sessionId, { providerResumeToken: token })
401+
.catch((e) => logger.warn(`Failed to persist resume token for ${agent.sessionId.slice(0, 8)}: ${errMessage(e)}`));
402+
}
403+
393404
async function finalize(agent: AgentProcess, opts: { crashed: boolean; error?: unknown }, ctx: RuntimeContext): Promise<void> {
394405
const { taskId, sessionId } = agent;
395406
const sessions = getSessionManager();

0 commit comments

Comments
 (0)