Skip to content

Commit 44f8144

Browse files
committed
fix(gemini): stream live tool events and history
1 parent f8e7d75 commit 44f8144

6 files changed

Lines changed: 597 additions & 48 deletions

File tree

apps/web/src/components/RelayRuntimeProvider.tsx

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,15 @@ export function convertEvents(events: RelayEvent[], agentStatus: AgentStatus): T
160160
}
161161
}
162162

163+
function appendPart(part: ContentPart) {
164+
const previous = currentParts[currentParts.length - 1];
165+
if ((part.type === "text" || part.type === "reasoning") && previous?.type === part.type) {
166+
previous.text += part.text;
167+
return;
168+
}
169+
currentParts.push(part);
170+
}
171+
163172
function updateOrAppend(block: { type: string; [k: string]: any }) {
164173
if (block.type === "tool_result") {
165174
const tc = toolCallMap.get(block.tool_use_id);
@@ -203,7 +212,7 @@ export function convertEvents(events: RelayEvent[], agentStatus: AgentStatus): T
203212

204213
// Fallback: append as new part
205214
const part = mapBlock(block);
206-
if (part) currentParts.push(part);
215+
if (part) appendPart(part);
207216
}
208217

209218
for (const re of events) {
@@ -224,7 +233,7 @@ export function convertEvents(events: RelayEvent[], agentStatus: AgentStatus): T
224233
ensureCurrentTurn(re);
225234
turnRunning = true;
226235
const part = mapBlock(event.block);
227-
if (part) currentParts.push(part);
236+
if (part) appendPart(part);
228237
continue;
229238
}
230239

@@ -285,7 +294,7 @@ export function convertEvents(events: RelayEvent[], agentStatus: AgentStatus): T
285294
updateOrAppend(block as any);
286295
} else {
287296
const part = mapBlock(block);
288-
if (part) currentParts.push(part);
297+
if (part) appendPart(part);
289298
}
290299
}
291300
continue;

packages/cli/src/providers/gemini.ts

Lines changed: 244 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import { existsSync, readFileSync } from "node:fs";
1+
import { existsSync, readdirSync, readFileSync } from "node:fs";
22
import { homedir } from "node:os";
33
import { join } from "node:path";
4-
import { createLogger } from "../logger.js";
4+
import { type BashArgs, type GlobArgs, type GrepArgs, type ReadArgs, ToolName, type WriteArgs } from "@agent-kanban/shared";
55
import { spawnAgent } from "./spawnHelper.js";
6-
import type { AgentEvent, AgentHandle, AgentProvider, ExecuteOpts, HistoryEvent } from "./types.js";
6+
import type { AgentEvent, AgentHandle, AgentProvider, ContentBlock, ExecuteOpts, HistoryEvent } from "./types.js";
77

8-
const logger = createLogger("gemini");
98
const OAUTH_CREDS_PATH = join(homedir(), ".gemini", "oauth_creds.json");
9+
const GEMINI_TMP_DIR = join(homedir(), ".gemini", "tmp");
1010

1111
/** Per 1M tokens, paid tier pricing */
1212
const GEMINI_PRICING: Record<string, { input: number; output: number }> = {
@@ -32,15 +32,41 @@ function buildPrompt(opts: ExecuteOpts): string {
3232
}
3333

3434
export function parseEvent(raw: string): AgentEvent | null {
35-
let event: any;
36-
try {
37-
event = JSON.parse(raw);
38-
} catch {
39-
return null;
35+
const event = parseJsonEvent(raw);
36+
if (!event) return null;
37+
38+
if (event.type === "message" && event.role === "assistant") {
39+
return liveMessageEvent(event);
40+
}
41+
42+
if (event.type === "tool_use") {
43+
const normalized = normalizeGeminiTool(event.tool_name ?? "tool", event.parameters ?? {});
44+
if (!normalized) return null;
45+
return {
46+
type: "message",
47+
blocks: [
48+
{
49+
type: "tool_use",
50+
id: event.tool_id ?? `${event.tool_name ?? "gemini"}-${event.timestamp ?? "tool"}`,
51+
name: normalized.name,
52+
input: normalized.input,
53+
},
54+
],
55+
};
4056
}
4157

42-
if (event.type === "message" && event.role === "assistant" && event.content) {
43-
return { type: "message", blocks: [{ type: "text", text: event.content }] };
58+
if (event.type === "tool_result") {
59+
return {
60+
type: "message",
61+
blocks: [
62+
{
63+
type: "tool_result",
64+
tool_use_id: event.tool_id ?? "",
65+
output: textFromContent(event.output ?? event.result ?? event.content) || String(event.status ?? "done"),
66+
error: event.status === "error",
67+
},
68+
],
69+
};
4470
}
4571

4672
if (event.type === "result") {
@@ -66,23 +92,201 @@ export function parseEvent(raw: string): AgentEvent | null {
6692
return null;
6793
}
6894

95+
function liveMessageEvent(event: any): AgentEvent | null {
96+
const blocks: ContentBlock[] = [];
97+
if (event.content) blocks.push({ type: "text", text: String(event.content) });
98+
99+
for (const toolCall of event.toolCalls ?? event.tool_calls ?? []) {
100+
const normalized = normalizeGeminiTool(toolCall.name ?? toolCall.function?.name ?? "tool", toolCall.args ?? toolCall.function?.arguments ?? {});
101+
if (!normalized) continue;
102+
const id = toolCall.id ?? `${event.id ?? "gemini-live"}-tool`;
103+
blocks.push({ type: "tool_use", id, name: normalized.name, input: normalized.input });
104+
const result = toolCall.result ?? toolCall.response;
105+
if (result) blocks.push({ type: "tool_result", tool_use_id: id, output: textFromContent(result) || JSON.stringify(result) });
106+
}
107+
108+
return blocks.length > 0 ? { type: "message", blocks } : null;
109+
}
110+
111+
export function parseSessionId(raw: string): string | null {
112+
const event = parseJsonEvent(raw);
113+
return typeof event?.session_id === "string" ? event.session_id : null;
114+
}
115+
116+
function parseJsonEvent(raw: string): any | null {
117+
try {
118+
return JSON.parse(raw);
119+
} catch {
120+
const jsonStart = raw.indexOf("{");
121+
if (jsonStart === -1) return null;
122+
return JSON.parse(raw.slice(jsonStart));
123+
}
124+
}
125+
69126
export function buildArgs(opts: ExecuteOpts): string[] {
70-
const args = ["--prompt", buildPrompt(opts), "--output-format", "stream-json", "--yolo"];
127+
const args = ["--prompt", buildPrompt(opts), "--output-format", "stream-json", "--yolo", "--skip-trust"];
71128
if (opts.model) {
72129
args.push("--model", opts.model);
73130
}
74131
return args;
75132
}
76133

77-
export function buildResumeArgs(model?: string, prompt = ""): string[] {
78-
logger.warn("Gemini CLI does not support resume by session ID — resuming latest session");
79-
const args = ["--resume", "latest", "--prompt", prompt, "--output-format", "stream-json", "--yolo"];
134+
export function buildResumeArgs(sessionId: string, model?: string, prompt = ""): string[] {
135+
const args = ["--resume", sessionId, "--prompt", prompt, "--output-format", "stream-json", "--yolo", "--skip-trust"];
80136
if (model) {
81137
args.push("--model", model);
82138
}
83139
return args;
84140
}
85141

142+
export function resolveGeminiCommand(env: NodeJS.ProcessEnv = process.env): string {
143+
if (!env.VOLTA_HOME) return "gemini";
144+
const voltaPackageBin = join(env.VOLTA_HOME, "tools", "image", "packages", "@google", "gemini-cli", "bin", "gemini");
145+
return existsSync(voltaPackageBin) ? voltaPackageBin : "gemini";
146+
}
147+
148+
function findGeminiSessionFiles(sessionId: string): string[] {
149+
const shortId = sessionId.slice(0, 8);
150+
const matches: string[] = [];
151+
try {
152+
for (const project of readdirSync(GEMINI_TMP_DIR)) {
153+
const chatsDir = join(GEMINI_TMP_DIR, project, "chats");
154+
let files: string[];
155+
try {
156+
files = readdirSync(chatsDir);
157+
} catch {
158+
continue;
159+
}
160+
for (const file of files) {
161+
if (!file.endsWith(`-${shortId}.jsonl`)) continue;
162+
const path = join(chatsDir, file);
163+
if (isGeminiSessionFile(path, sessionId)) matches.push(path);
164+
}
165+
}
166+
} catch {
167+
return [];
168+
}
169+
return matches.sort();
170+
}
171+
172+
function isGeminiSessionFile(path: string, sessionId: string): boolean {
173+
const firstLine = readFileSync(path, "utf-8").split("\n", 1)[0];
174+
const header = JSON.parse(firstLine);
175+
return header.sessionId === sessionId;
176+
}
177+
178+
function textFromContent(content: unknown): string {
179+
if (typeof content === "string") return content;
180+
if (!Array.isArray(content)) return "";
181+
return content
182+
.map((part) => {
183+
if (typeof part === "string") return part;
184+
if (part && typeof part === "object" && "text" in part) return String((part as { text?: unknown }).text ?? "");
185+
return "";
186+
})
187+
.filter(Boolean)
188+
.join("");
189+
}
190+
191+
function eventFromRecord(record: any, id: string): AgentEvent | null {
192+
if (record.type === "user") {
193+
const text = textFromContent(record.content);
194+
return text ? { type: "message.user", text } : null;
195+
}
196+
if (record.type !== "gemini") return null;
197+
198+
const blocks: ContentBlock[] = [];
199+
const text = textFromContent(record.content);
200+
if (text) blocks.push({ type: "text", text });
201+
for (const toolCall of record.toolCalls ?? []) {
202+
const normalized = normalizeGeminiTool(toolCall.name ?? "tool", toolCall.args ?? {});
203+
if (!normalized) continue;
204+
blocks.push({ type: "tool_use", id: toolCall.id ?? `${id}-tool`, name: normalized.name, input: normalized.input });
205+
if (toolCall.result)
206+
blocks.push({
207+
type: "tool_result",
208+
tool_use_id: toolCall.id ?? "",
209+
output: textFromContent(toolCall.result) || JSON.stringify(toolCall.result),
210+
});
211+
}
212+
return blocks.length > 0 ? { type: "message", blocks } : null;
213+
}
214+
215+
function normalizeGeminiTool(name: string, rawArgs: Record<string, unknown>): { name: string; input: Record<string, unknown> } | null {
216+
switch (name) {
217+
case "run_shell_command": {
218+
const args: BashArgs = {
219+
command: String(rawArgs.command ?? ""),
220+
description: rawArgs.description === undefined ? undefined : String(rawArgs.description),
221+
};
222+
return { name: ToolName.Bash, input: args };
223+
}
224+
case "read_file": {
225+
const args: ReadArgs = {
226+
filePath: String(rawArgs.file_path ?? rawArgs.path ?? ""),
227+
offset: rawArgs.offset as number | undefined,
228+
limit: rawArgs.limit as number | undefined,
229+
};
230+
return { name: ToolName.Read, input: args };
231+
}
232+
case "list_directory": {
233+
const args: ReadArgs = { filePath: String(rawArgs.dir_path ?? rawArgs.path ?? "") };
234+
return { name: ToolName.Read, input: args };
235+
}
236+
case "write_file": {
237+
const args: WriteArgs = { filePath: String(rawArgs.file_path ?? rawArgs.path ?? ""), content: String(rawArgs.content ?? "") };
238+
return { name: ToolName.Write, input: args };
239+
}
240+
case "glob": {
241+
const args: GlobArgs = { pattern: String(rawArgs.pattern ?? ""), path: rawArgs.path as string | undefined };
242+
return { name: ToolName.Glob, input: args };
243+
}
244+
case "search_file_content": {
245+
const args: GrepArgs = { pattern: String(rawArgs.pattern ?? ""), path: rawArgs.path as string | undefined };
246+
return { name: ToolName.Grep, input: args };
247+
}
248+
case "google_web_search":
249+
case "web_search":
250+
return { name: ToolName.WebSearch, input: { query: String(rawArgs.query ?? "") } };
251+
case "activate_skill":
252+
case "update_topic":
253+
return null;
254+
default:
255+
return { name, input: rawArgs };
256+
}
257+
}
258+
259+
function readJsonlHistory(file: string): HistoryEvent[] {
260+
const messages = new Map<string, any>();
261+
for (const line of readFileSync(file, "utf-8").split("\n")) {
262+
if (!line.trim()) continue;
263+
const record = JSON.parse(line);
264+
if (typeof record.$rewindTo === "string") {
265+
let deleting = false;
266+
for (const id of Array.from(messages.keys())) {
267+
if (id === record.$rewindTo) deleting = true;
268+
if (deleting) messages.delete(id);
269+
}
270+
} else if (typeof record.id === "string") {
271+
messages.set(record.id, record);
272+
}
273+
}
274+
return historyFromRecords(Array.from(messages.values()));
275+
}
276+
277+
function historyFromRecords(records: any[]): HistoryEvent[] {
278+
return records.flatMap((record, index) => {
279+
const id = record.id ?? `gemini-hist-${index + 1}`;
280+
const event = eventFromRecord(record, id);
281+
return event ? [{ id, event, timestamp: record.timestamp ?? new Date().toISOString() }] : [];
282+
});
283+
}
284+
285+
/** @internal Exported for tests only. */
286+
export function readGeminiHistory(sessionId: string): HistoryEvent[] {
287+
return findGeminiSessionFiles(sessionId).flatMap(readJsonlHistory);
288+
}
289+
86290
export const geminiProvider: AgentProvider = {
87291
name: "gemini",
88292
label: "Gemini CLI",
@@ -93,22 +297,32 @@ export const geminiProvider: AgentProvider = {
93297
: { status: "unauthorized", detail: "Gemini CLI is not authenticated" };
94298
},
95299

96-
execute(opts: ExecuteOpts): Promise<AgentHandle> {
97-
const args = opts.resume ? buildResumeArgs(opts.model, opts.taskContext) : buildArgs(opts);
98-
return Promise.resolve(
99-
spawnAgent({
100-
command: "gemini",
101-
args,
102-
cwd: opts.cwd,
103-
env: opts.env,
104-
parseEvent,
105-
}),
106-
);
300+
async execute(opts: ExecuteOpts): Promise<AgentHandle> {
301+
if (opts.resume && !opts.resumeToken) throw new Error("gemini: resume requested but no resumeToken provided");
302+
303+
let resumeToken = opts.resumeToken;
304+
const args = opts.resume ? buildResumeArgs(opts.resumeToken!, opts.model, opts.taskContext) : buildArgs(opts);
305+
const handle = spawnAgent({
306+
command: resolveGeminiCommand(opts.env),
307+
args,
308+
cwd: opts.cwd,
309+
env: opts.env,
310+
onLine(raw) {
311+
resumeToken = parseSessionId(raw) ?? resumeToken;
312+
},
313+
parseEvent,
314+
});
315+
316+
return {
317+
...handle,
318+
getResumeToken() {
319+
return resumeToken;
320+
},
321+
};
107322
},
108323

109-
// Gemini CLI does not expose a stable on-disk session format we can parse,
110-
// and its `--resume latest` flow gives us no per-session identifier to key on.
111-
async getHistory(): Promise<HistoryEvent[]> {
112-
return [];
324+
async getHistory(_sessionId, resumeToken): Promise<HistoryEvent[]> {
325+
if (!resumeToken) return [];
326+
return readGeminiHistory(resumeToken);
113327
},
114328
};

0 commit comments

Comments
 (0)