Skip to content

Commit 74c480d

Browse files
Merge a314ad2 into 43deaf4
2 parents 43deaf4 + a314ad2 commit 74c480d

6 files changed

Lines changed: 252 additions & 1 deletion

src/auto-reply/reply/session-reset-cleanup.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@ import {
44
peekSystemEvents,
55
resetSystemEventsForTest,
66
} from "../../infra/system-events.js";
7+
import {
8+
getDiagnosticSessionActivitySnapshot,
9+
markDiagnosticToolStartedForTest,
10+
resetDiagnosticRunActivityForTest,
11+
} from "../../logging/diagnostic-run-activity.js";
712
import { clearSessionResetRuntimeState } from "./session-reset-cleanup.js";
813

914
afterEach(() => {
1015
resetSystemEventsForTest();
16+
resetDiagnosticRunActivityForTest();
1117
});
1218

1319
describe("clearSessionResetRuntimeState", () => {
@@ -20,8 +26,39 @@ describe("clearSessionResetRuntimeState", () => {
2026

2127
expect(result.keys).toEqual(["alpha", "beta"]);
2228
expect(result.systemEventsCleared).toBe(2);
29+
expect(result.diagnosticActivityCleared).toEqual({
30+
activeEmbeddedRunsCleared: 0,
31+
activeToolsCleared: 0,
32+
activeModelCallsCleared: 0,
33+
activitiesCleared: 0,
34+
});
2335
expect(peekSystemEvents("alpha")).toStrictEqual([]);
2436
expect(peekSystemEvents("beta")).toStrictEqual([]);
2537
expect(peekSystemEvents("gamma")).toEqual(["fresh gamma"]);
2638
});
39+
40+
it("clears stale diagnostic tool activity for reset session refs", () => {
41+
markDiagnosticToolStartedForTest({
42+
sessionId: "session-old",
43+
sessionKey: "agent:main:telegram:chat-1",
44+
runId: "run-old",
45+
toolName: "bash",
46+
toolCallId: "tool-old",
47+
});
48+
49+
const result = clearSessionResetRuntimeState(["agent:main:telegram:chat-1", "session-old"]);
50+
51+
expect(result.diagnosticActivityCleared).toEqual({
52+
activeEmbeddedRunsCleared: 0,
53+
activeToolsCleared: 1,
54+
activeModelCallsCleared: 0,
55+
activitiesCleared: 1,
56+
});
57+
expect(
58+
getDiagnosticSessionActivitySnapshot({
59+
sessionId: "session-old",
60+
sessionKey: "agent:main:telegram:chat-1",
61+
}).activeWorkKind,
62+
).toBeUndefined();
63+
});
2764
});

src/auto-reply/reply/session-reset-cleanup.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import { drainSystemEventEntries } from "../../infra/system-events.js";
2+
import {
3+
clearDiagnosticSessionActivity,
4+
type ClearDiagnosticSessionActivityResult,
5+
} from "../../logging/diagnostic-run-activity.js";
26
import { clearSessionQueues, type ClearSessionQueueResult } from "./queue/cleanup.js";
37

48
export type ClearSessionResetRuntimeStateResult = ClearSessionQueueResult & {
59
systemEventsCleared: number;
10+
diagnosticActivityCleared: ClearDiagnosticSessionActivityResult;
611
};
712

813
export function clearSessionResetRuntimeState(
@@ -15,8 +20,30 @@ export function clearSessionResetRuntimeState(
1520
systemEventsCleared += drainSystemEventEntries(key).length;
1621
}
1722

23+
const diagnosticActivityCleared = cleared.keys.reduce<ClearDiagnosticSessionActivityResult>(
24+
(acc, key) => {
25+
const result = clearDiagnosticSessionActivity({
26+
sessionId: key,
27+
sessionKey: key,
28+
reason: "session_reset",
29+
});
30+
acc.activeEmbeddedRunsCleared += result.activeEmbeddedRunsCleared;
31+
acc.activeToolsCleared += result.activeToolsCleared;
32+
acc.activeModelCallsCleared += result.activeModelCallsCleared;
33+
acc.activitiesCleared += result.activitiesCleared;
34+
return acc;
35+
},
36+
{
37+
activeEmbeddedRunsCleared: 0,
38+
activeToolsCleared: 0,
39+
activeModelCallsCleared: 0,
40+
activitiesCleared: 0,
41+
},
42+
);
43+
1844
return {
1945
...cleared,
2046
systemEventsCleared,
47+
diagnosticActivityCleared,
2148
};
2249
}

src/logging/diagnostic-run-activity.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {
2+
emitInternalDiagnosticEvent,
23
onInternalDiagnosticEvent,
34
type DiagnosticEventPayload,
45
type DiagnosticSessionActiveWorkKind,
@@ -46,6 +47,13 @@ export type DiagnosticSessionActivitySnapshot = {
4647
lastProgressReason?: string;
4748
};
4849

50+
export type ClearDiagnosticSessionActivityResult = {
51+
activeEmbeddedRunsCleared: number;
52+
activeToolsCleared: number;
53+
activeModelCallsCleared: number;
54+
activitiesCleared: number;
55+
};
56+
4957
const activityByRef = new Map<string, SessionActivity>();
5058
const activityByRunId = new Map<string, SessionActivity>();
5159

@@ -321,6 +329,76 @@ export function getDiagnosticSessionActivitySnapshot(
321329
};
322330
}
323331

332+
function clearActivityReferences(activity: SessionActivity): void {
333+
for (const [ref, mapped] of activityByRef) {
334+
if (mapped === activity) {
335+
activityByRef.delete(ref);
336+
}
337+
}
338+
for (const [runId, mapped] of activityByRunId) {
339+
if (mapped === activity) {
340+
activityByRunId.delete(runId);
341+
}
342+
}
343+
}
344+
345+
export function clearDiagnosticSessionActivity(params: {
346+
sessionId?: string;
347+
sessionKey?: string;
348+
reason: string;
349+
emitEvent?: boolean;
350+
}): ClearDiagnosticSessionActivityResult {
351+
const activities = new Set<SessionActivity>();
352+
for (const ref of sessionRefs(params)) {
353+
const activity = activityByRef.get(ref);
354+
if (activity) {
355+
activities.add(activity);
356+
}
357+
}
358+
359+
const result: ClearDiagnosticSessionActivityResult = {
360+
activeEmbeddedRunsCleared: 0,
361+
activeToolsCleared: 0,
362+
activeModelCallsCleared: 0,
363+
activitiesCleared: 0,
364+
};
365+
366+
for (const activity of activities) {
367+
result.activeEmbeddedRunsCleared += activity.activeEmbeddedRuns.size;
368+
result.activeToolsCleared += activity.activeTools.size;
369+
result.activeModelCallsCleared += activity.activeModelCalls.size;
370+
activity.activeEmbeddedRuns.clear();
371+
activity.activeTools.clear();
372+
activity.activeModelCalls.clear();
373+
clearActivityReferences(activity);
374+
result.activitiesCleared += 1;
375+
}
376+
377+
if (
378+
params.emitEvent !== false &&
379+
(result.activeEmbeddedRunsCleared > 0 ||
380+
result.activeToolsCleared > 0 ||
381+
result.activeModelCallsCleared > 0)
382+
) {
383+
emitInternalDiagnosticEvent({
384+
type: "log.record",
385+
level: "warn",
386+
message: "diagnostic session activity cleared",
387+
loggerName: "diagnostic-run-activity",
388+
attributes: {
389+
reason: params.reason,
390+
...(params.sessionId ? { sessionId: params.sessionId } : {}),
391+
...(params.sessionKey ? { sessionKey: params.sessionKey } : {}),
392+
activeEmbeddedRunsCleared: result.activeEmbeddedRunsCleared,
393+
activeToolsCleared: result.activeToolsCleared,
394+
activeModelCallsCleared: result.activeModelCallsCleared,
395+
},
396+
});
397+
}
398+
399+
return result;
400+
}
401+
324402
export function markDiagnosticRunProgressForTest(params: DiagnosticRunProgressActivityEvent): void {
325403
markDiagnosticRunProgress(params);
326404
}

src/logging/diagnostic-stuck-session-recovery.runtime.test.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const mocks = vi.hoisted(() => ({
1616
resolveActiveEmbeddedRunHandleSessionIdBySessionFile: vi.fn(),
1717
resolveEmbeddedSessionLane: vi.fn((key: string) => `session:${key}`),
1818
waitForEmbeddedAgentRunEnd: vi.fn(),
19+
clearDiagnosticSessionActivity: vi.fn(),
1920
getDiagnosticSessionActivitySnapshot: vi.fn(),
2021
diag: {
2122
debug: vi.fn(),
@@ -68,6 +69,7 @@ vi.mock("./diagnostic-runtime.js", () => ({
6869
}));
6970

7071
vi.mock("./diagnostic-run-activity.js", () => ({
72+
clearDiagnosticSessionActivity: mocks.clearDiagnosticSessionActivity,
7173
getDiagnosticSessionActivitySnapshot: mocks.getDiagnosticSessionActivitySnapshot,
7274
}));
7375

@@ -98,6 +100,13 @@ function resetMocks() {
98100
mocks.resolveActiveEmbeddedRunHandleSessionIdBySessionFile.mockReset();
99101
mocks.resolveEmbeddedSessionLane.mockClear();
100102
mocks.waitForEmbeddedAgentRunEnd.mockReset();
103+
mocks.clearDiagnosticSessionActivity.mockReset();
104+
mocks.clearDiagnosticSessionActivity.mockReturnValue({
105+
activeEmbeddedRunsCleared: 0,
106+
activeToolsCleared: 0,
107+
activeModelCallsCleared: 0,
108+
activitiesCleared: 0,
109+
});
101110
mocks.getDiagnosticSessionActivitySnapshot.mockReset();
102111
mocks.getDiagnosticSessionActivitySnapshot.mockReturnValue({});
103112
mocks.diag.debug.mockReset();
@@ -453,11 +462,51 @@ describe("stuck session recovery", () => {
453462
});
454463

455464
expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main");
465+
expect(mocks.clearDiagnosticSessionActivity).toHaveBeenCalledWith({
466+
sessionId: "stale-session",
467+
sessionKey: "agent:main:main",
468+
reason: "stuck_recovery:no_active_work",
469+
});
456470
expect(warnLogMessages()).toEqual([
457471
"stuck session recovery outcome: status=noop action=none sessionId=stale-session sessionKey=agent:main:main lane=session:agent:main:main reason=no_active_work",
458472
]);
459473
});
460474

475+
it("clears orphaned diagnostic tool activity when recovery releases stale queued state", async () => {
476+
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
477+
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue(undefined);
478+
mocks.isEmbeddedAgentRunActive.mockReturnValue(false);
479+
mocks.resetCommandLane.mockReturnValue(0);
480+
mocks.clearDiagnosticSessionActivity.mockReturnValue({
481+
activeEmbeddedRunsCleared: 0,
482+
activeToolsCleared: 1,
483+
activeModelCallsCleared: 0,
484+
activitiesCleared: 1,
485+
});
486+
487+
const outcome = await recoverStuckDiagnosticSession({
488+
sessionId: "stale-tool-session",
489+
sessionKey: "agent:main:telegram:group:1",
490+
ageMs: 180_000,
491+
queueDepth: 2,
492+
});
493+
494+
expect(outcome).toMatchObject({
495+
status: "released",
496+
action: "release_lane",
497+
sessionId: "stale-tool-session",
498+
sessionKey: "agent:main:telegram:group:1",
499+
});
500+
expect(mocks.clearDiagnosticSessionActivity).toHaveBeenCalledWith({
501+
sessionId: "stale-tool-session",
502+
sessionKey: "agent:main:telegram:group:1",
503+
reason: "stuck_recovery:release_lane",
504+
});
505+
expect(warnLogMessages()).toContain(
506+
"stuck session recovery cleared diagnostic activity: sessionId=stale-tool-session sessionKey=agent:main:telegram:group:1 reason=stuck_recovery:release_lane activeEmbeddedRunsCleared=0 activeToolsCleared=1 activeModelCallsCleared=0",
507+
);
508+
});
509+
461510
it("clears stale queued processing state even when the lane has no active work", async () => {
462511
mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined);
463512
mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue(undefined);

src/logging/diagnostic-stuck-session-recovery.runtime.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ import {
99
resolveActiveEmbeddedRunHandleSessionIdBySessionFile,
1010
} from "../agents/embedded-agent-runner/runs.js";
1111
import { getCommandLaneSnapshot, resetCommandLane } from "../process/command-queue.js";
12-
import { getDiagnosticSessionActivitySnapshot } from "./diagnostic-run-activity.js";
12+
import {
13+
clearDiagnosticSessionActivity,
14+
getDiagnosticSessionActivitySnapshot,
15+
} from "./diagnostic-run-activity.js";
1316
import { diagnosticLogger as diag } from "./diagnostic-runtime.js";
1417
import {
1518
formatStoppedCronSessionDiagnosticFields,
@@ -81,6 +84,33 @@ function formatRecoveryContext(
8184
return fields.join(" ");
8285
}
8386

87+
function clearRecoveredDiagnosticActivity(
88+
params: StuckSessionRecoveryParams,
89+
reason: string,
90+
): void {
91+
const result = clearDiagnosticSessionActivity({
92+
sessionId: params.sessionId,
93+
sessionKey: params.sessionKey,
94+
reason,
95+
});
96+
if (
97+
result.activeEmbeddedRunsCleared === 0 &&
98+
result.activeToolsCleared === 0 &&
99+
result.activeModelCallsCleared === 0
100+
) {
101+
return;
102+
}
103+
diag.warn(
104+
`stuck session recovery cleared diagnostic activity: sessionId=${
105+
params.sessionId ?? "unknown"
106+
} sessionKey=${params.sessionKey ?? "unknown"} reason=${reason} activeEmbeddedRunsCleared=${
107+
result.activeEmbeddedRunsCleared
108+
} activeToolsCleared=${result.activeToolsCleared} activeModelCallsCleared=${
109+
result.activeModelCallsCleared
110+
}`,
111+
);
112+
}
113+
84114
export async function recoverStuckDiagnosticSession(
85115
params: StuckSessionRecoveryParams,
86116
): Promise<StuckSessionRecoveryOutcome> {
@@ -252,6 +282,7 @@ export async function recoverStuckDiagnosticSession(
252282

253283
if (aborted || forceCleared || released > 0 || clearStaleQueuedSession) {
254284
const action = aborted || forceCleared ? "abort_embedded_run" : "release_lane";
285+
clearRecoveredDiagnosticActivity(params, `stuck_recovery:${action}`);
255286
const stoppedFields = formatStoppedCronSessionDiagnosticFields(
256287
resolveCronSessionDiagnosticContext({ sessionKey: params.sessionKey, activeSessionId }),
257288
);
@@ -289,6 +320,7 @@ export async function recoverStuckDiagnosticSession(
289320
diag.warn(`stuck session recovery outcome: ${formatRecoveryOutcome(outcome)}`);
290321
return outcome;
291322
}
323+
clearRecoveredDiagnosticActivity(params, "stuck_recovery:no_active_work");
292324
const outcome: StuckSessionRecoveryOutcome = {
293325
status: "noop",
294326
action: "none",

src/logging/diagnostic.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
} from "../infra/diagnostic-events.js";
1111
import { withDiagnosticPhase } from "./diagnostic-phase.js";
1212
import {
13+
clearDiagnosticSessionActivity,
1314
getDiagnosticSessionActivitySnapshot,
1415
markDiagnosticRunProgressForTest,
1516
markDiagnosticEmbeddedRunEnded,
@@ -275,6 +276,33 @@ describe("diagnostic session activity aliases", () => {
275276
getDiagnosticSessionActivitySnapshot({ sessionId: "s1", sessionKey: "main" }).activeWorkKind,
276277
).toBeUndefined();
277278
});
279+
280+
it("clears orphaned diagnostic tool activity for a retired session", () => {
281+
markDiagnosticEmbeddedRunStarted({ sessionId: "s1", sessionKey: "main" });
282+
markDiagnosticToolStartedForTest({
283+
sessionId: "s1",
284+
sessionKey: "main",
285+
runId: "run-1",
286+
toolName: "bash",
287+
toolCallId: "tool-1",
288+
});
289+
290+
const result = clearDiagnosticSessionActivity({
291+
sessionId: "s1",
292+
sessionKey: "main",
293+
reason: "test_retired_session",
294+
emitEvent: false,
295+
});
296+
297+
expect(result).toEqual({
298+
activeEmbeddedRunsCleared: 1,
299+
activeToolsCleared: 1,
300+
activeModelCallsCleared: 0,
301+
activitiesCleared: 1,
302+
});
303+
expect(getDiagnosticSessionActivitySnapshot({ sessionId: "s1" })).toEqual({});
304+
expect(getDiagnosticSessionActivitySnapshot({ sessionKey: "main" })).toEqual({});
305+
});
278306
});
279307

280308
describe("logger import side effects", () => {

0 commit comments

Comments
 (0)