Skip to content

Commit 70f45ff

Browse files
fix(pi-runner): flush blocks after compaction retry
1 parent e2f82d4 commit 70f45ff

3 files changed

Lines changed: 45 additions & 4 deletions

File tree

src/agents/pi-embedded-runner/run/attempt.spawn-workspace.context-engine.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,32 @@ describe("runEmbeddedAttempt context engine sessionKey forwarding", () => {
210210
vi.restoreAllMocks();
211211
});
212212

213+
it("flushes block replies again after compaction retry wait resolves", async () => {
214+
const order: string[] = [];
215+
let flushCount = 0;
216+
const onBlockReplyFlush = vi.fn(async () => {
217+
flushCount += 1;
218+
order.push(`flush-${flushCount}`);
219+
});
220+
hoisted.waitForCompactionRetryWithAggregateTimeoutMock.mockImplementation(async () => {
221+
order.push("retry-wait");
222+
return { timedOut: false };
223+
});
224+
225+
await createContextEngineAttemptRunner({
226+
contextEngine: createContextEngineBootstrapAndAssemble(),
227+
sessionKey,
228+
tempPaths,
229+
attemptOverrides: {
230+
onBlockReplyFlush,
231+
},
232+
});
233+
234+
expect(onBlockReplyFlush).toHaveBeenCalledTimes(2);
235+
expect(hoisted.waitForCompactionRetryWithAggregateTimeoutMock).toHaveBeenCalledTimes(1);
236+
expect(order).toEqual(["flush-1", "retry-wait", "flush-2"]);
237+
});
238+
213239
it("enables Tool Search controls for embedded PI runs when configured", async () => {
214240
await createContextEngineAttemptRunner({
215241
contextEngine: {

src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type AcquireSessionWriteLockFn =
2929
typeof import("../../session-write-lock.js").acquireSessionWriteLock;
3030
type ShouldPreemptivelyCompactBeforePromptFn =
3131
typeof import("./preemptive-compaction.js").shouldPreemptivelyCompactBeforePrompt;
32+
type WaitForCompactionRetryWithAggregateTimeoutFn =
33+
typeof import("./compaction-retry-aggregate-timeout.js").waitForCompactionRetryWithAggregateTimeout;
3234

3335
type SubscriptionMock = ReturnType<SubscribeEmbeddedPiSessionFn>;
3436
type UnknownMock = Mock<(...args: unknown[]) => unknown>;
@@ -86,6 +88,7 @@ type AttemptSpawnWorkspaceHoisted = {
8688
(sessionKey: string | undefined, config: unknown) => number | undefined
8789
>;
8890
limitHistoryTurnsMock: Mock<<T>(messages: T, limit: number | undefined) => T>;
91+
waitForCompactionRetryWithAggregateTimeoutMock: Mock<WaitForCompactionRetryWithAggregateTimeoutFn>;
8992
preemptiveCompactionCalls: Parameters<ShouldPreemptivelyCompactBeforePromptFn>[0][];
9093
systemPromptOverrideTexts: string[];
9194
sessionManager: SessionManagerMocks;
@@ -181,6 +184,10 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => {
181184
const limitHistoryTurnsMock = vi.fn<<T>(messages: T, limit: number | undefined) => T>(
182185
(messages) => messages,
183186
);
187+
const waitForCompactionRetryWithAggregateTimeoutMock =
188+
vi.fn<WaitForCompactionRetryWithAggregateTimeoutFn>(async () => ({
189+
timedOut: false,
190+
}));
184191
const preemptiveCompactionCalls: Parameters<ShouldPreemptivelyCompactBeforePromptFn>[0][] = [];
185192
const systemPromptOverrideTexts: string[] = [];
186193
const sessionManager = {
@@ -221,6 +228,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => {
221228
detectAndLoadPromptImagesMock,
222229
getHistoryLimitFromSessionKeyMock,
223230
limitHistoryTurnsMock,
231+
waitForCompactionRetryWithAggregateTimeoutMock,
224232
preemptiveCompactionCalls,
225233
systemPromptOverrideTexts,
226234
sessionManager,
@@ -774,10 +782,9 @@ vi.mock("../utils.js", () => ({
774782
}));
775783

776784
vi.mock("./compaction-retry-aggregate-timeout.js", () => ({
777-
waitForCompactionRetryWithAggregateTimeout: async () => ({
778-
timedOut: false,
779-
aborted: false,
780-
}),
785+
waitForCompactionRetryWithAggregateTimeout: (
786+
...args: Parameters<WaitForCompactionRetryWithAggregateTimeoutFn>
787+
) => hoisted.waitForCompactionRetryWithAggregateTimeoutMock(...args),
781788
}));
782789

783790
vi.mock("./compaction-timeout.js", () => ({
@@ -956,6 +963,9 @@ export function resetEmbeddedAttemptHarness(
956963
hoisted.runContextEngineMaintenanceMock.mockReset().mockResolvedValue(undefined);
957964
hoisted.getHistoryLimitFromSessionKeyMock.mockReset().mockReturnValue(undefined);
958965
hoisted.limitHistoryTurnsMock.mockReset().mockImplementation((messages) => messages);
966+
hoisted.waitForCompactionRetryWithAggregateTimeoutMock
967+
.mockReset()
968+
.mockResolvedValue({ timedOut: false });
959969
hoisted.preemptiveCompactionCalls.length = 0;
960970
hoisted.systemPromptOverrideTexts.length = 0;
961971
hoisted.sessionManager.getLeafEntry.mockReset().mockReturnValue(null);

src/agents/pi-embedded-runner/run/attempt.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4246,6 +4246,11 @@ export async function runEmbeddedAttempt(
42464246
`proceeding with pre-compaction state runId=${params.runId} sessionId=${params.sessionId}`,
42474247
);
42484248
}
4249+
} else if (onBlockReplyFlush) {
4250+
// Retry-generated blocks can still be draining when the compaction
4251+
// retry wait resolves; this second drain is idempotent when no new
4252+
// blocks were produced.
4253+
await onBlockReplyFlush();
42494254
}
42504255
} catch (err) {
42514256
if (isRunnerAbortError(err)) {

0 commit comments

Comments
 (0)