Skip to content

Commit d200780

Browse files
committed
fix: address round-2 PR review feedback
ampServerManager: - Emit turn.completed when AMP process dies with active turn - Reject concurrent turns in sendTurn - Consistent tool item types between start/completion events - Sanitize raw provider output in JSONL parse logging - Guard against duplicate turn.completed emissions geminiCliServerManager: - Reject overlapping turns in sendTurn - Always emit terminal turn.completed on child close - Respect input.model override, reject unsupported attachments - Fix test to simulate closed-but-present session path - Add tests for concurrent turn rejection and attachment rejection kiloServerManager: - Reject unsupported attachments in sendTurn - Clean up listeners on server start promise resolve/reject - Remove double-cast by using ProviderRuntimeEvent directly - Remove no-op readJsonData pass-through function CopilotAdapter: - Reorder reconfigureSession to create new before destroying old - Guard against NaN percentUsed when limit is 0 - Clean up client on createSession/resumeSession failure - Update stale runtimeMode on session reuse early return Types and registry: - Replace any with unknown in SDKMessage for stricter typing - Use Effect.die for idiomatic duplicate registration error - Add comment explaining global usage accumulator intent - Fix test to use different thread for cross-thread verification Other: - Merge discovered models by slug preserving metadata in ChatView - Fix deriveRepoUrl .git suffix handling - Expand plan docs: credential security, rollback criteria, chaos testing
1 parent 3057152 commit d200780

12 files changed

Lines changed: 313 additions & 150 deletions

.plans/17-claude-code.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ Each provider manages its own authentication externally:
122122
1. **Environment variables and CLI auth** -- Credentials are resolved via provider-native mechanisms (e.g. `ANTHROPIC_API_KEY` for Claude, `OPENAI_API_KEY` for Codex, `gh auth` for Copilot). The adapter layer never stores or brokers credentials directly; it relies on the underlying CLI/SDK picking them up from the environment.
123123
2. **Per-provider rate limiting** -- Each server manager (`codexAppServerManager`, `claudeCodeServerManager`, etc.) is responsible for honoring its provider's rate limits. Adapters should surface rate-limit errors as `ProviderAdapterProcessError` so orchestration can report them cleanly.
124124
3. **Concurrent session limits** -- The number of simultaneous provider sessions is bounded by system resources (open processes, file descriptors, memory). `ProviderSessionDirectory` tracks active sessions but does not enforce hard caps; operators should monitor resource usage when running multiple providers concurrently.
125+
4. **Credential leakage prevention** -- Error messages, logs, and serialized `ProviderAdapterProcessError` payloads must never include raw API keys or tokens. Adapters should redact secrets before surfacing diagnostics.
126+
5. **Secure environment propagation** -- When spawning child processes (CLI binaries, SDK sub-processes), pass an explicit environment whitelist rather than forwarding the entire `process.env`. This limits accidental exposure of unrelated secrets to the child.
127+
6. **Secret rotation** -- Rotating a provider API key or token requires restarting all active sessions for that provider. Document this operational requirement; there is no hot-reload path for credentials.
125128

126129
### 2.2 Claude runtime bridge
127130

@@ -341,6 +344,14 @@ Whichever option is chosen:
341344
2. checkpoint revert tests must pass under orchestration expectations
342345
3. user-visible activity log should explain failures clearly when provider rollback is impossible
343346

347+
### Decision criteria
348+
349+
Choose the rollback strategy as follows:
350+
351+
1. If the Agent SDK exposes a native rewind/rollback API that can truncate conversation history to an arbitrary checkpoint, use **Option A** (provider-native rewind). This gives the cleanest UX and avoids session restart overhead.
352+
2. If no native rewind API exists or it cannot target the exact checkpoint boundary orchestration requires, use **Option B** (session restart + state truncation shim).
353+
3. **Time-box rule:** if investigation into Option A takes longer than 2 working days without a reliable prototype, default to Option B and move on. Option A can be revisited as a follow-up enhancement once the base integration is stable.
354+
344355
---
345356

346357
## Phase 5: Web integration
@@ -434,6 +445,9 @@ Cover cross-provider interactions that single-adapter tests miss:
434445
2. **Concurrent active sessions** -- Run sessions on two or more different providers simultaneously. Verify events from each session are routed to the correct orchestration thread without cross-contamination.
435446
3. **Resume cursor isolation** -- Persist resume cursors from two different providers, then attempt to resume each. Confirm that one provider's cursor cannot accidentally be used to resume another provider's session (adapter parse should reject mismatched cursors).
436447
4. **Provider health monitoring** -- Simulate a provider becoming unavailable (process crash, binary missing). Verify `listProviderStatuses()` reflects the degraded state and that orchestration surfaces a clear error to the client rather than hanging.
448+
5. **Performance under load** -- Run 10+ concurrent provider sessions across mixed adapters. Monitor memory usage, open file descriptors, and event-delivery latency to ensure the server remains responsive and does not leak resources.
449+
6. **Chaos scenarios** -- Forcibly kill provider child processes and inject network timeouts mid-stream. Verify that orchestration detects the failure, emits a clear `runtime.error`, and cleans up session resources without leaving zombie processes.
450+
7. **Resume after ungraceful shutdown** -- Terminate the server (SIGKILL) while sessions are active, then restart. Validate that persisted resume cursors allow sessions to recover and that no corrupted state prevents new sessions from starting.
437451

438452
---
439453

apps/server/src/ampServerManager.ts

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
} from "@t3tools/contracts";
2121
import type { ProviderSessionUsage, ProviderUsageResult } from "@t3tools/contracts";
2222
import type { ProviderThreadSnapshot } from "./provider/Services/ProviderAdapter.ts";
23+
import { createLogger } from "./logger.ts";
2324

2425
// ── Constants ───────────────────────────────────────────────────────
2526

@@ -78,6 +79,8 @@ interface AmpSession {
7879
activeAssistantItemId: RuntimeItemId | undefined;
7980
/** Maps parent_tool_use_id → RuntimeTaskId for tracking subagent tasks. */
8081
readonly subagentTasks: Map<string, string>;
82+
/** Maps tool_use_id → classified item type for consistent start/completion typing. */
83+
readonly toolItemTypes: Map<string, ReturnType<typeof classifyToolName>>;
8184
readonly createdAt: string;
8285
updatedAt: string;
8386
}
@@ -167,6 +170,7 @@ export class AmpServerManager extends EventEmitter<{
167170
event: [ProviderRuntimeEvent];
168171
}> {
169172
private readonly sessions = new Map<ThreadId, AmpSession>();
173+
private readonly logger = createLogger("amp");
170174

171175
// ── Session lifecycle ───────────────────────────────────────────
172176

@@ -211,6 +215,7 @@ export class AmpServerManager extends EventEmitter<{
211215
activeTurnId: undefined,
212216
activeAssistantItemId: undefined,
213217
subagentTasks: new Map(),
218+
toolItemTypes: new Map(),
214219
createdAt: now,
215220
updatedAt: now,
216221
};
@@ -237,6 +242,17 @@ export class AmpServerManager extends EventEmitter<{
237242
child.on("close", (code) => {
238243
const s = this.sessions.get(threadId);
239244
if (s) {
245+
if (s.activeTurnId) {
246+
this.emitEvent(threadId, s.activeTurnId, {
247+
type: "turn.completed",
248+
payload: {
249+
state: "failed",
250+
errorMessage: `AMP process exited with code ${code}`,
251+
},
252+
});
253+
s.activeTurnId = undefined;
254+
s.activeAssistantItemId = undefined;
255+
}
240256
s.status = "closed";
241257
s.updatedAt = new Date().toISOString();
242258
this.emitEvent(threadId, s.activeTurnId, {
@@ -252,10 +268,21 @@ export class AmpServerManager extends EventEmitter<{
252268
child.on("error", (error) => {
253269
const s = this.sessions.get(threadId);
254270
if (s) {
271+
if (s.activeTurnId) {
272+
this.emitEvent(threadId, s.activeTurnId, {
273+
type: "turn.completed",
274+
payload: {
275+
state: "failed",
276+
errorMessage: `AMP process error: ${error.message}`,
277+
},
278+
});
279+
s.activeTurnId = undefined;
280+
s.activeAssistantItemId = undefined;
281+
}
255282
s.status = "closed";
256283
s.updatedAt = new Date().toISOString();
257284
}
258-
this.emitEvent(threadId, session.activeTurnId, {
285+
this.emitEvent(threadId, s?.activeTurnId, {
259286
type: "runtime.error",
260287
payload: { message: error.message, class: "transport_error" },
261288
});
@@ -285,6 +312,11 @@ export class AmpServerManager extends EventEmitter<{
285312
if (session.status === "closed") {
286313
throw new Error(`AMP session is closed: ${input.threadId}`);
287314
}
315+
if (session.status === "running" || session.activeTurnId) {
316+
throw new Error(
317+
`AMP session ${input.threadId} already has a turn in progress (turn ${session.activeTurnId})`,
318+
);
319+
}
288320

289321
const turnId = TurnId.makeUnsafe(randomUUID());
290322
session.activeTurnId = turnId;
@@ -419,7 +451,7 @@ export class AmpServerManager extends EventEmitter<{
419451
msg = JSON.parse(trimmed) as AmpJsonlMessage;
420452
} catch {
421453
// Non-JSON output — treat as raw assistant text.
422-
console.warn(`[amp] Failed to parse JSONL line, treating as text: ${trimmed.slice(0, 120)}`);
454+
this.logger.warn("Failed to parse JSONL line", { length: trimmed.length });
423455
this.emitEvent(threadId, session.activeTurnId, {
424456
type: "content.delta",
425457
payload: {
@@ -533,7 +565,8 @@ export class AmpServerManager extends EventEmitter<{
533565
}
534566

535567
// For persistent sessions, a turn completes when stop_reason is "end_turn".
536-
if (inner?.stop_reason === "end_turn") {
568+
// Guard against duplicate turn.completed (handleResultMessage may also emit one).
569+
if (inner?.stop_reason === "end_turn" && session.activeTurnId && session.status !== "ready") {
537570
_ampUsageAccumulator.turnCount++;
538571
this.closeAllSubagentTasks(threadId, session);
539572
this.emitEvent(threadId, session.activeTurnId, {
@@ -595,6 +628,7 @@ export class AmpServerManager extends EventEmitter<{
595628
// A tool use starts a new assistant message segment — clear the active item.
596629
session.activeAssistantItemId = undefined;
597630
const itemType = classifyToolName(block.name);
631+
session.toolItemTypes.set(block.id, itemType);
598632
const itemId = RuntimeItemId.makeUnsafe(block.id);
599633
this.emitEvent(
600634
threadId,
@@ -680,13 +714,16 @@ export class AmpServerManager extends EventEmitter<{
680714
if (block.type === "tool_result") {
681715
const resultBlock = block as AmpToolResultContentBlock;
682716
const itemId = RuntimeItemId.makeUnsafe(resultBlock.tool_use_id);
717+
const itemType =
718+
session.toolItemTypes.get(resultBlock.tool_use_id) ?? "dynamic_tool_call";
719+
session.toolItemTypes.delete(resultBlock.tool_use_id);
683720
this.emitEvent(
684721
threadId,
685722
session.activeTurnId,
686723
{
687724
type: "item.completed",
688725
payload: {
689-
itemType: "dynamic_tool_call",
726+
itemType,
690727
status: resultBlock.is_error ? "failed" : "completed",
691728
data: resultBlock.content,
692729
},
@@ -705,6 +742,10 @@ export class AmpServerManager extends EventEmitter<{
705742
session: AmpSession,
706743
msg: AmpJsonlMessage,
707744
): void {
745+
// Guard: only complete the turn if one is still active (handleAssistantMessage
746+
// may have already completed it via stop_reason === "end_turn").
747+
if (!session.activeTurnId || session.status === "ready") return;
748+
708749
// Close all open subagent tasks before completing the turn.
709750
this.closeAllSubagentTasks(threadId, session);
710751

apps/server/src/geminiCliServerManager.test.ts

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,67 @@ describe("GeminiCliServerManager", () => {
133133
provider: "geminiCli",
134134
runtimeMode: "full-access",
135135
});
136-
manager.stopSession(asThreadId("thread-1"));
136+
137+
// Directly mark the session as closed without removing it from the map,
138+
// so we exercise the "closed session" branch (not the "unknown session" branch).
139+
const sessions = (
140+
manager as unknown as { sessions: Map<string, { status: string }> }
141+
).sessions;
142+
const session = sessions.get("thread-1");
143+
expect(session).toBeDefined();
144+
session!.status = "closed";
137145

138146
expect(() =>
139147
manager.sendTurn({
140148
threadId: asThreadId("thread-1"),
141149
input: "hello",
142150
}),
143-
).toThrow("Unknown Gemini CLI session");
151+
).toThrow("Gemini CLI session is closed");
152+
});
153+
154+
it("rejects when session is already running", async () => {
155+
const manager = new GeminiCliServerManager();
156+
await manager.startSession({
157+
threadId: asThreadId("thread-1"),
158+
provider: "geminiCli",
159+
runtimeMode: "full-access",
160+
});
161+
162+
// Mark the session as running to simulate an in-progress turn.
163+
const sessions = (
164+
manager as unknown as { sessions: Map<string, { status: string }> }
165+
).sessions;
166+
const session = sessions.get("thread-1");
167+
expect(session).toBeDefined();
168+
session!.status = "running";
169+
170+
expect(() =>
171+
manager.sendTurn({
172+
threadId: asThreadId("thread-1"),
173+
input: "hello",
174+
}),
175+
).toThrow("Gemini CLI session already running");
176+
});
177+
178+
it("rejects when attachments are provided", async () => {
179+
const manager = new GeminiCliServerManager();
180+
try {
181+
await manager.startSession({
182+
threadId: asThreadId("thread-1"),
183+
provider: "geminiCli",
184+
runtimeMode: "full-access",
185+
});
186+
187+
expect(() =>
188+
manager.sendTurn({
189+
threadId: asThreadId("thread-1"),
190+
input: "hello",
191+
attachments: [{ type: "image", url: "https://example.com/img.png" }] as never,
192+
}),
193+
).toThrow("does not support attachments");
194+
} finally {
195+
manager.stopAll();
196+
}
144197
});
145198
});
146199

apps/server/src/geminiCliServerManager.ts

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,14 @@ export class GeminiCliServerManager extends EventEmitter<{
239239
if (session.status === "closed") {
240240
throw new Error(`Gemini CLI session is closed: ${input.threadId}`);
241241
}
242+
if (session.status === "running") {
243+
throw new Error(`Gemini CLI session already running: ${input.threadId}`);
244+
}
245+
246+
// Reject attachments — Gemini CLI doesn't support them.
247+
if (input.attachments && input.attachments.length > 0) {
248+
throw new Error("Gemini CLI does not support attachments");
249+
}
242250

243251
const turnId = TurnId.makeUnsafe(randomUUID());
244252
session.activeTurnId = turnId;
@@ -249,6 +257,9 @@ export class GeminiCliServerManager extends EventEmitter<{
249257

250258
const prompt = input.input ?? "";
251259

260+
// Use per-turn model override if provided, otherwise fall back to session model.
261+
const effectiveModel = input.model ?? session.model;
262+
252263
// Build args for headless mode with stream-json output.
253264
const args: string[] = [
254265
"-p",
@@ -259,8 +270,8 @@ export class GeminiCliServerManager extends EventEmitter<{
259270
resolveApprovalMode(session.runtimeMode),
260271
];
261272

262-
if (session.model) {
263-
args.push("-m", session.model);
273+
if (effectiveModel) {
274+
args.push("-m", effectiveModel);
264275
}
265276

266277
// Resume previous Gemini session for follow-up turns.
@@ -296,20 +307,21 @@ export class GeminiCliServerManager extends EventEmitter<{
296307

297308
s.activeProcess = undefined;
298309

299-
// If the turn wasn't already completed by a "result" event, mark it.
310+
// If the turn wasn't already completed by a "result" event, emit a terminal turn.completed.
300311
if (s.status === "running" && s.activeTurnId === turnId) {
301312
s.status = "ready";
302313
s.updatedAt = new Date().toISOString();
303314

304-
if (code !== 0) {
305-
this.emitEvent(input.threadId, turnId, {
306-
type: "turn.completed",
307-
payload: {
308-
state: "failed",
309-
errorMessage: `Gemini CLI exited with code ${code}`,
310-
},
311-
});
312-
}
315+
this.emitEvent(input.threadId, turnId, {
316+
type: "turn.completed",
317+
payload:
318+
code === 0
319+
? { state: "completed" }
320+
: {
321+
state: "failed",
322+
errorMessage: `Gemini CLI exited with code ${code}`,
323+
},
324+
});
313325
}
314326
});
315327

0 commit comments

Comments
 (0)