Skip to content

Commit 7ff20f6

Browse files
author
Andrew Meyer
committed
fix(pi): keep message-tool delivery in session lock
1 parent 3d96111 commit 7ff20f6

7 files changed

Lines changed: 705 additions & 42 deletions

File tree

src/agents/pi-embedded-runner/run/attempt.session-lock.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@ import fs from "node:fs/promises";
22
import os from "node:os";
33
import path from "node:path";
44
import { afterEach, describe, expect, it, vi } from "vitest";
5+
import {
6+
runWithOwnedSessionTranscriptWriteLock,
7+
withOwnedSessionTranscriptWrites,
8+
} from "../../../config/sessions/transcript-write-context.js";
59
import { SessionWriteLockTimeoutError } from "../../session-write-lock-error.js";
610
import {
711
createEmbeddedAttemptSessionLockController,
@@ -179,6 +183,37 @@ describe("embedded attempt session lock lifecycle", () => {
179183
expect(release).toHaveBeenCalledTimes(2);
180184
});
181185

186+
it("refreshes the prompt fence after an owned transcript mirror append", async () => {
187+
const sessionFile = await createTempSessionFile();
188+
const release = vi.fn(async () => {});
189+
const acquireSessionWriteLock = vi.fn(async () => ({ release }));
190+
const controller = await createEmbeddedAttemptSessionLockController({
191+
acquireSessionWriteLock,
192+
lockOptions: { ...lockOptions, sessionFile },
193+
});
194+
195+
await controller.releaseForPrompt();
196+
await withOwnedSessionTranscriptWrites(
197+
{
198+
sessionFile,
199+
sessionKey: "agent:main:discord:channel:123",
200+
withSessionWriteLock: (operation) => controller.withSessionWriteLock(operation),
201+
},
202+
async () =>
203+
await runWithOwnedSessionTranscriptWriteLock(
204+
{ sessionFile, sessionKey: "agent:main:discord:channel:123" },
205+
async () => {
206+
await fs.appendFile(sessionFile, '{"type":"message","id":"delivery-mirror"}\n', "utf8");
207+
},
208+
),
209+
);
210+
await expect(controller.withSessionWriteLock(() => "finalize")).resolves.toBe("finalize");
211+
212+
expect(controller.hasSessionTakeover()).toBe(false);
213+
expect(acquireSessionWriteLock).toHaveBeenCalledTimes(3);
214+
expect(release).toHaveBeenCalledTimes(3);
215+
});
216+
182217
it("returns a no-op cleanup lock after prompt lock reacquisition times out", async () => {
183218
const releases: string[] = [];
184219
const acquireSessionWriteLock = vi

src/agents/pi-embedded-runner/run/attempt.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
runQuotaSuspensionMaintenance,
1616
updateSessionStoreEntry,
1717
} from "../../../config/sessions/store.js";
18+
import { withOwnedSessionTranscriptWrites } from "../../../config/sessions/transcript-write-context.js";
1819
import { resolveContextEngineOwnerPluginId } from "../../../context-engine/registry.js";
1920
import type { AssembleResult } from "../../../context-engine/types.js";
2021
import { emitTrustedDiagnosticEvent } from "../../../infra/diagnostic-events.js";
@@ -380,6 +381,7 @@ import {
380381
} from "./incomplete-turn.js";
381382
import { resolveLlmIdleTimeoutMs, streamWithIdleTimeout } from "./llm-idle-timeout.js";
382383
import { resolveMessageMergeStrategy } from "./message-merge-strategy.js";
384+
import { installMessageToolOnlyTerminalHook } from "./message-tool-terminal.js";
383385
import {
384386
MID_TURN_PRECHECK_ERROR_MESSAGE,
385387
isMidTurnPrecheckSignal,
@@ -2379,6 +2381,10 @@ export async function runEmbeddedAttempt(
23792381
session: activeSession,
23802382
withSessionWriteLock: (operation) => sessionLockController.withSessionWriteLock(operation),
23812383
});
2384+
installMessageToolOnlyTerminalHook({
2385+
agent: activeSession.agent,
2386+
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
2387+
});
23822388
prepStages.mark("agent-session");
23832389
if (isRawModelRun) {
23842390
// Raw model probes should measure exactly the requested prompt against
@@ -3163,7 +3169,15 @@ export async function runEmbeddedAttempt(
31633169
prompt: string,
31643170
options?: Parameters<typeof activeSession.prompt>[1],
31653171
): Promise<void> =>
3166-
abortable(trackPromptSettlePromise(activeSession.prompt(prompt, options)));
3172+
withOwnedSessionTranscriptWrites(
3173+
{
3174+
sessionFile: params.sessionFile,
3175+
sessionKey: params.sessionKey,
3176+
withSessionWriteLock: (operation) =>
3177+
sessionLockController.withSessionWriteLock(operation),
3178+
},
3179+
async () => abortable(trackPromptSettlePromise(activeSession.prompt(prompt, options))),
3180+
);
31673181

31683182
const subscription = subscribeEmbeddedPiSession(
31693183
buildEmbeddedSubscriptionParams({

0 commit comments

Comments
 (0)