Skip to content

Commit 8f8dbab

Browse files
author
scotthuang
committed
fix(gateway): preserve chat retry guard after terminal state
1 parent 056cc12 commit 8f8dbab

5 files changed

Lines changed: 90 additions & 4 deletions

File tree

src/gateway/chat-abort.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ export type ChatAbortControllerEntry = {
1515
providerId?: string;
1616
authProviderId?: string;
1717
abortStopReason?: string;
18+
/**
19+
* Controls only the sessions.list active-run projection. Terminal lifecycle
20+
* clears this before chat.send settles, while the entry stays as the retry
21+
* idempotency guard until normal cleanup removes it.
22+
*/
23+
projectSessionActive?: boolean;
1824
/**
1925
* Which RPC owns this registration. Absent (undefined) is treated as
2026
* `"chat-send"` so pre-existing callers that constructed entries without

src/gateway/server-chat.agent-events.test.ts

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import {
4444
createSessionEventSubscriberRegistry,
4545
createSessionMessageSubscriberRegistry,
4646
createToolEventRecipientRegistry,
47+
type AgentEventHandlerOptions,
4748
} from "./server-chat.js";
4849
import { loadGatewaySessionRow } from "./server-chat.load-gateway-session-row.runtime.js";
4950
import { loadSessionEntry } from "./session-utils.js";
@@ -79,14 +80,16 @@ describe("agent event handler", () => {
7980
resolveSessionKeyForRun?: (runId: string) => string | undefined;
8081
lifecycleErrorRetryGraceMs?: number;
8182
isChatSendRunActive?: (runId: string) => boolean;
83+
clearTrackedActiveRun?: AgentEventHandlerOptions["clearTrackedActiveRun"];
8284
}) {
8385
const nowSpy =
8486
params?.now === undefined ? undefined : vi.spyOn(Date, "now").mockReturnValue(params.now);
8587
const broadcast = vi.fn();
8688
const broadcastToConnIds = vi.fn();
8789
const nodeSendToSession = vi.fn();
8890
const clearAgentRunContext = vi.fn();
89-
const clearTrackedActiveRun = vi.fn();
91+
const clearTrackedActiveRun =
92+
vi.fn<NonNullable<AgentEventHandlerOptions["clearTrackedActiveRun"]>>();
9093
const agentRunSeq = new Map<string, number>();
9194
const chatRunState = createChatRunState();
9295
const toolEventRecipients = createToolEventRecipientRegistry();
@@ -107,7 +110,7 @@ describe("agent event handler", () => {
107110
loadGatewaySessionRowForSnapshot: loadGatewaySessionRow,
108111
lifecycleErrorRetryGraceMs: params?.lifecycleErrorRetryGraceMs,
109112
isChatSendRunActive: params?.isChatSendRunActive,
110-
clearTrackedActiveRun,
113+
clearTrackedActiveRun: params?.clearTrackedActiveRun ?? clearTrackedActiveRun,
111114
});
112115

113116
return {
@@ -1895,6 +1898,43 @@ describe("agent event handler", () => {
18951898
);
18961899
});
18971900

1901+
it("keeps chat send retry guards while hiding terminal session projection", () => {
1902+
const trackedActiveRuns = new Map<
1903+
string,
1904+
{ sessionKey: string; projectSessionActive?: boolean }
1905+
>([["client-run", { sessionKey: "session-finished" }]]);
1906+
const { chatRunState, handler } = createHarness({
1907+
clearTrackedActiveRun: ({ runId, clientRunId, sessionKey }) => {
1908+
for (const candidateRunId of new Set([runId, clientRunId])) {
1909+
const entry = trackedActiveRuns.get(candidateRunId);
1910+
if (entry?.sessionKey === sessionKey) {
1911+
entry.projectSessionActive = false;
1912+
}
1913+
}
1914+
},
1915+
});
1916+
chatRunState.registry.add("provider-run", {
1917+
sessionKey: "session-finished",
1918+
clientRunId: "client-run",
1919+
});
1920+
1921+
handler({
1922+
runId: "provider-run",
1923+
seq: 2,
1924+
stream: "lifecycle",
1925+
ts: 1_800,
1926+
data: {
1927+
phase: "end",
1928+
startedAt: 900,
1929+
endedAt: 1_700,
1930+
},
1931+
});
1932+
1933+
const retryGuard = trackedActiveRuns.get("client-run");
1934+
expect(retryGuard).toBeDefined();
1935+
expect(retryGuard?.projectSessionActive).toBe(false);
1936+
});
1937+
18981938
it("keeps aborted chat run markers through terminal lifecycle cleanup", () => {
18991939
const { broadcast, chatRunState, handler } = createHarness();
19001940
chatRunState.registry.add("run-aborted", {

src/gateway/server-methods/sessions.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,11 @@ function collectTrackedActiveSessionRunKeys(
666666
return keys;
667667
}
668668
for (const active of context.chatAbortControllers.values()) {
669-
if (typeof active.sessionKey === "string" && active.sessionKey.trim()) {
669+
if (
670+
active.projectSessionActive !== false &&
671+
typeof active.sessionKey === "string" &&
672+
active.sessionKey.trim()
673+
) {
670674
keys.add(active.sessionKey);
671675
}
672676
}

src/gateway/server-runtime-subscriptions.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ export function startGatewayEventSubscriptions(params: {
4949
for (const candidateRunId of new Set([runId, clientRunId])) {
5050
const entry = params.chatAbortControllers.get(candidateRunId);
5151
if (entry?.sessionKey === sessionKey) {
52-
params.chatAbortControllers.delete(candidateRunId);
52+
entry.projectSessionActive = false;
5353
}
5454
}
5555
},

src/gateway/server.sessions.list-changed.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,42 @@ test("sessions.list marks sessions with active abortable runs", async () => {
254254
expect(session.hasActiveRun).toBe(true);
255255
});
256256

257+
test("sessions.list ignores terminal abortable runs kept for retry guards", async () => {
258+
await createSessionStoreDir();
259+
await writeSessionStore({
260+
entries: {
261+
main: sessionStoreEntry("sess-main"),
262+
},
263+
});
264+
265+
const respond = vi.fn();
266+
const sessionsHandlers = await getSessionsHandlers();
267+
const { getRuntimeConfig } = await getGatewayConfigModule();
268+
await sessionsHandlers["sessions.list"]({
269+
req: {
270+
type: "req",
271+
id: "req-sessions-list-terminal-run",
272+
method: "sessions.list",
273+
params: {},
274+
},
275+
params: {},
276+
respond,
277+
client: null,
278+
isWebchatConnect: () => false,
279+
context: {
280+
getRuntimeConfig,
281+
loadGatewayModelCatalog: async () => [],
282+
chatAbortControllers: new Map([
283+
["run-1", { sessionKey: "agent:main:main", projectSessionActive: false }],
284+
]),
285+
} as never,
286+
});
287+
288+
const payload = expectRespondPayload(respond);
289+
const session = findSession(payload, "agent:main:main");
290+
expect(session.hasActiveRun).toBe(false);
291+
});
292+
257293
test("sessions.list yields before responding during bulk transcript hydration", async () => {
258294
const { dir } = await createSessionStoreDir();
259295
const entries: Record<string, ReturnType<typeof sessionStoreEntry>> = {};

0 commit comments

Comments
 (0)