Skip to content

Commit ec83715

Browse files
committed
fix(agents): add trajectory flush timeout diagnostics
1 parent 16ef041 commit ec83715

7 files changed

Lines changed: 183 additions & 4 deletions

File tree

src/agents/pi-embedded-runner/run/attempt.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4487,6 +4487,7 @@ export async function runEmbeddedAttempt(
44874487
sessionId: params.sessionId,
44884488
step: "pi-trajectory-flush",
44894489
log,
4490+
getTimeoutDetails: () => trajectoryRecorder?.describeFlushState(),
44904491
cleanup: async () => {
44914492
await trajectoryRecorder?.flush();
44924493
},

src/agents/queued-file-writer.test.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,34 @@ describe("getQueuedFileWriter", () => {
9292

9393
expect(fs.readFileSync(filePath, "utf8")).toBe("12345\n");
9494
});
95+
96+
it("reports pending queue diagnostics before flush drains writes", async () => {
97+
const tmpDir = makeTempDir();
98+
const filePath = path.join(tmpDir, "trace.jsonl");
99+
const writer = getQueuedFileWriter(new Map(), filePath, {
100+
maxFileBytes: 1024,
101+
maxQueuedBytes: 1024,
102+
yieldBeforeWrite: true,
103+
});
104+
105+
writer.write("line\n");
106+
107+
expect(writer.describeQueue?.()).toEqual({
108+
pendingWrites: 1,
109+
queuedBytes: 5,
110+
activeOperation: "idle",
111+
activeWriteBytes: undefined,
112+
maxFileBytes: 1024,
113+
maxQueuedBytes: 1024,
114+
yieldBeforeWrite: true,
115+
});
116+
117+
await writer.flush();
118+
119+
expect(writer.describeQueue?.()).toMatchObject({
120+
pendingWrites: 0,
121+
queuedBytes: 0,
122+
activeOperation: "idle",
123+
});
124+
});
95125
});

src/agents/queued-file-writer.ts

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,21 @@ import { appendRegularFile, resolveRegularFileAppendFlags } from "../infra/fs-sa
44

55
export type QueuedFileWriteResult = "queued" | "dropped";
66

7+
export type QueuedFileWriterDiagnostics = {
8+
pendingWrites: number;
9+
queuedBytes: number;
10+
activeOperation: "idle" | "mkdir" | "yield" | "file-append";
11+
activeWriteBytes?: number;
12+
maxFileBytes?: number;
13+
maxQueuedBytes?: number;
14+
yieldBeforeWrite: boolean;
15+
};
16+
717
export type QueuedFileWriter = {
818
filePath: string;
919
write: (line: string) => unknown;
1020
flush: () => Promise<void>;
21+
describeQueue?: () => QueuedFileWriterDiagnostics;
1122
};
1223

1324
type QueuedFileWriterOptions = {
@@ -50,7 +61,10 @@ export function getQueuedFileWriter(
5061
const dir = path.dirname(filePath);
5162
const ready = fs.mkdir(dir, { recursive: true, mode: 0o700 }).catch(() => undefined);
5263
let queue: Promise<unknown> = Promise.resolve();
64+
let pendingWrites = 0;
5365
let queuedBytes = 0;
66+
let activeOperation: QueuedFileWriterDiagnostics["activeOperation"] = "idle";
67+
let activeWriteBytes: number | undefined;
5468

5569
const writer: QueuedFileWriter = {
5670
filePath,
@@ -62,20 +76,45 @@ export function getQueuedFileWriter(
6276
) {
6377
return "dropped";
6478
}
79+
pendingWrites += 1;
6580
queuedBytes += lineBytes;
6681
queue = queue
67-
.then(() => ready)
68-
.then(() => (options.yieldBeforeWrite ? waitForImmediate() : undefined))
69-
.then(() => safeAppendFile(filePath, line, options))
82+
.then(async () => {
83+
activeOperation = "mkdir";
84+
await ready;
85+
})
86+
.then(async () => {
87+
if (options.yieldBeforeWrite) {
88+
activeOperation = "yield";
89+
await waitForImmediate();
90+
}
91+
})
92+
.then(async () => {
93+
activeOperation = "file-append";
94+
activeWriteBytes = lineBytes;
95+
await safeAppendFile(filePath, line, options);
96+
})
7097
.catch(() => undefined)
7198
.finally(() => {
99+
pendingWrites = Math.max(0, pendingWrites - 1);
72100
queuedBytes = Math.max(0, queuedBytes - lineBytes);
101+
activeWriteBytes = undefined;
102+
activeOperation = pendingWrites > 0 ? activeOperation : "idle";
73103
});
74104
return "queued";
75105
},
76106
flush: async () => {
77107
await queue;
78108
},
109+
describeQueue: () => ({
110+
pendingWrites,
111+
queuedBytes,
112+
activeOperation,
113+
activeWriteBytes,
114+
maxFileBytes: options.maxFileBytes,
115+
maxQueuedBytes: options.maxQueuedBytes,
116+
yieldBeforeWrite: options.yieldBeforeWrite === true,
117+
}),
79118
};
80119

81120
writers.set(filePath, writer);

src/agents/run-cleanup-timeout.test.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,50 @@ describe("agent cleanup timeout", () => {
6565
);
6666
});
6767

68+
it("includes cleanup timeout details when the cleanup step exposes them", async () => {
69+
const cleanup = vi.fn(async () => new Promise<never>(() => {}));
70+
71+
const result = runAgentCleanupStep({
72+
runId: "run-trajectory",
73+
sessionId: "session-trajectory",
74+
step: "pi-trajectory-flush",
75+
cleanup,
76+
log,
77+
timeoutMs: 5,
78+
getTimeoutDetails: () => "pendingWrites=2 queuedBytes=128 activeOperation=file-append",
79+
});
80+
81+
await vi.advanceTimersByTimeAsync(5);
82+
await expect(result).resolves.toBeUndefined();
83+
84+
expect(log.warn).toHaveBeenCalledWith(
85+
"agent cleanup timed out: runId=run-trajectory sessionId=session-trajectory step=pi-trajectory-flush timeoutMs=5 details=pendingWrites=2 queuedBytes=128 activeOperation=file-append",
86+
);
87+
});
88+
89+
it("does not fail cleanup when timeout details throw", async () => {
90+
const cleanup = vi.fn(async () => new Promise<never>(() => {}));
91+
92+
const result = runAgentCleanupStep({
93+
runId: "run-trajectory",
94+
sessionId: "session-trajectory",
95+
step: "pi-trajectory-flush",
96+
cleanup,
97+
log,
98+
timeoutMs: 5,
99+
getTimeoutDetails: () => {
100+
throw new Error("details unavailable");
101+
},
102+
});
103+
104+
await vi.advanceTimersByTimeAsync(5);
105+
await expect(result).resolves.toBeUndefined();
106+
107+
expect(log.warn).toHaveBeenCalledWith(
108+
"agent cleanup timed out: runId=run-trajectory sessionId=session-trajectory step=pi-trajectory-flush timeoutMs=5 detailsError=details unavailable",
109+
);
110+
});
111+
68112
it("uses the general cleanup timeout environment override for other cleanup steps", async () => {
69113
const cleanup = vi.fn(async () => new Promise<never>(() => {}));
70114

src/agents/run-cleanup-timeout.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,17 @@ function parseTimeoutEnvValue(value: string | undefined): number | undefined {
2828
return normalized > 0 ? normalized : undefined;
2929
}
3030

31+
function resolveCleanupTimeoutDetails(
32+
getTimeoutDetails: (() => string | undefined) | undefined,
33+
): string {
34+
try {
35+
const timeoutDetails = getTimeoutDetails?.()?.trim();
36+
return timeoutDetails ? ` details=${timeoutDetails}` : "";
37+
} catch (error) {
38+
return ` detailsError=${formatErrorMessage(error)}`;
39+
}
40+
}
41+
3142
export function resolveAgentCleanupStepTimeoutMs(params: {
3243
step: string;
3344
timeoutMs?: number;
@@ -54,6 +65,7 @@ export async function runAgentCleanupStep(params: {
5465
sessionId: string;
5566
step: string;
5667
cleanup: () => Promise<void>;
68+
getTimeoutDetails?: () => string | undefined;
5769
log: AgentCleanupLogger;
5870
env?: NodeJS.ProcessEnv;
5971
timeoutMs?: number;
@@ -88,8 +100,9 @@ export async function runAgentCleanupStep(params: {
88100
clearTimeout(timeoutHandle);
89101
}
90102
if (result === "timeout") {
103+
const details = resolveCleanupTimeoutDetails(params.getTimeoutDetails);
91104
params.log.warn(
92-
`agent cleanup timed out: runId=${params.runId} sessionId=${params.sessionId} step=${params.step} timeoutMs=${timeoutMs}`,
105+
`agent cleanup timed out: runId=${params.runId} sessionId=${params.sessionId} step=${params.step} timeoutMs=${timeoutMs}${details}`,
93106
);
94107
void cleanupPromise.catch((error) => {
95108
params.log.warn(

src/trajectory/runtime.test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,33 @@ describe("trajectory runtime", () => {
169169
expect(truncated?.data.droppedEvents).toBeGreaterThan(0);
170170
});
171171

172+
it("describes queued writer state for cleanup timeout logs", () => {
173+
const recorder = createTrajectoryRuntimeRecorder({
174+
sessionId: "session-1",
175+
sessionFile: "/tmp/session.jsonl",
176+
writer: {
177+
filePath: "/tmp/session.trajectory.jsonl",
178+
write: () => "queued",
179+
flush: async () => undefined,
180+
describeQueue: () => ({
181+
pendingWrites: 2,
182+
queuedBytes: 256,
183+
activeOperation: "file-append",
184+
activeWriteBytes: 128,
185+
maxFileBytes: 1024,
186+
maxQueuedBytes: 1024,
187+
yieldBeforeWrite: true,
188+
}),
189+
},
190+
});
191+
192+
const runtimeRecorder = expectTrajectoryRuntimeRecorder(recorder);
193+
194+
expect(runtimeRecorder.describeFlushState()).toBe(
195+
"pendingWrites=2 queuedBytes=256 activeOperation=file-append yieldBeforeWrite=true activeWriteBytes=128 maxQueuedBytes=1024 maxFileBytes=1024",
196+
);
197+
});
198+
172199
it("writes a session-adjacent pointer when using an override directory", () => {
173200
const tmpDir = makeTempDir();
174201
const sessionFile = path.join(tmpDir, "session.jsonl");

src/trajectory/runtime.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type TrajectoryRuntimeRecorder = {
4545
filePath: string;
4646
recordEvent: (type: string, data?: Record<string, unknown>) => void;
4747
flush: () => Promise<void>;
48+
describeFlushState: () => string | undefined;
4849
};
4950

5051
const writers = new Map<string, QueuedFileWriter>();
@@ -208,6 +209,29 @@ function sanitizeTrajectoryPayload(data: Record<string, unknown>): Record<string
208209
>;
209210
}
210211

212+
function describeTrajectoryWriterFlushState(writer: QueuedFileWriter): string | undefined {
213+
const diagnostics = writer.describeQueue?.();
214+
if (!diagnostics) {
215+
return undefined;
216+
}
217+
const parts = [
218+
`pendingWrites=${diagnostics.pendingWrites}`,
219+
`queuedBytes=${diagnostics.queuedBytes}`,
220+
`activeOperation=${diagnostics.activeOperation}`,
221+
`yieldBeforeWrite=${diagnostics.yieldBeforeWrite}`,
222+
];
223+
if (diagnostics.activeWriteBytes !== undefined) {
224+
parts.push(`activeWriteBytes=${diagnostics.activeWriteBytes}`);
225+
}
226+
if (diagnostics.maxQueuedBytes !== undefined) {
227+
parts.push(`maxQueuedBytes=${diagnostics.maxQueuedBytes}`);
228+
}
229+
if (diagnostics.maxFileBytes !== undefined) {
230+
parts.push(`maxFileBytes=${diagnostics.maxFileBytes}`);
231+
}
232+
return parts.join(" ");
233+
}
234+
211235
export function toTrajectoryToolDefinitions(
212236
tools: ReadonlyArray<{ name?: string; description?: string; parameters?: unknown }>,
213237
): TrajectoryToolDefinition[] {
@@ -361,5 +385,6 @@ export function createTrajectoryRuntimeRecorder(
361385
writers.delete(filePath);
362386
}
363387
},
388+
describeFlushState: () => describeTrajectoryWriterFlushState(writer),
364389
};
365390
}

0 commit comments

Comments
 (0)