Skip to content

Commit 2bc5e9e

Browse files
galinilievGalin Iliev
authored andcommitted
fix(agents): prioritize manual session turns
1 parent e98ebb5 commit 2bc5e9e

5 files changed

Lines changed: 169 additions & 3 deletions

File tree

src/agents/pi-embedded-runner/run.overflow-compaction.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,40 @@ describe("runEmbeddedPiAgent overflow compaction trigger routing", () => {
307307
expect(attemptParams?.internalEvents).toBe(internalEvents);
308308
});
309309

310+
it("marks user-triggered session queue work as foreground", async () => {
311+
mockedRunEmbeddedAttempt.mockResolvedValueOnce(makeAttemptResult({ promptError: null }));
312+
const observedPriorities: unknown[] = [];
313+
314+
await runEmbeddedPiAgent({
315+
...overflowBaseRunParams,
316+
trigger: "user",
317+
runId: "run-user-session-priority",
318+
enqueue: async (task, opts) => {
319+
observedPriorities.push(opts?.priority);
320+
return await task();
321+
},
322+
});
323+
324+
expect(observedPriorities[0]).toBe("foreground");
325+
});
326+
327+
it("marks cron-triggered session queue work as background", async () => {
328+
mockedRunEmbeddedAttempt.mockResolvedValueOnce(makeAttemptResult({ promptError: null }));
329+
const observedPriorities: unknown[] = [];
330+
331+
await runEmbeddedPiAgent({
332+
...overflowBaseRunParams,
333+
trigger: "cron",
334+
runId: "run-cron-session-priority",
335+
enqueue: async (task, opts) => {
336+
observedPriorities.push(opts?.priority);
337+
return await task();
338+
},
339+
});
340+
341+
expect(observedPriorities[0]).toBe("background");
342+
});
343+
310344
it("forwards explicit OpenAI Codex auth profiles to codex plugin harnesses", async () => {
311345
const { clearAgentHarnesses, registerAgentHarness } = await import("../harness/registry.js");
312346
const pluginRunAttempt = vi.fn<AgentHarness["runAttempt"]>(async () =>

src/agents/pi-embedded-runner/run.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,23 @@ function withEmbeddedRunLaneTimeout(
211211
return { ...opts, taskTimeoutMs: laneTaskTimeoutMs };
212212
}
213213

214+
function resolveEmbeddedRunSessionQueuePriority(
215+
trigger: RunEmbeddedPiAgentParams["trigger"],
216+
): CommandQueueEnqueueOptions["priority"] {
217+
switch (trigger) {
218+
case "user":
219+
case "manual":
220+
return "foreground";
221+
case "cron":
222+
case "heartbeat":
223+
case "memory":
224+
case "overflow":
225+
return "background";
226+
default:
227+
return "normal";
228+
}
229+
}
230+
214231
function normalizeEmbeddedRunAttemptResult(
215232
attempt: EmbeddedRunAttemptForRunner,
216233
): EmbeddedRunAttemptForRunner {
@@ -378,6 +395,7 @@ export async function runEmbeddedPiAgent(
378395
}
379396
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
380397
const globalLane = resolveGlobalLane(params.lane);
398+
const sessionQueuePriority = resolveEmbeddedRunSessionQueuePriority(params.trigger);
381399
const laneTaskTimeoutMs = resolveEmbeddedRunLaneTimeoutMs(params.timeoutMs);
382400
let laneTaskProgressAtMs = Date.now();
383401
const noteLaneTaskProgress = () => {
@@ -395,8 +413,12 @@ export async function runEmbeddedPiAgent(
395413
params.enqueue
396414
? params.enqueue(task, withLaneTimeout(opts))
397415
: enqueueCommandInLane(globalLane, task, withLaneTimeout(opts));
398-
const enqueueSession = <T>(task: () => Promise<T>, opts?: CommandQueueEnqueueOptions) =>
399-
params.enqueue ? params.enqueue(task, opts) : enqueueCommandInLane(sessionLane, task, opts);
416+
const enqueueSession = <T>(task: () => Promise<T>, opts?: CommandQueueEnqueueOptions) => {
417+
const sessionOpts: CommandQueueEnqueueOptions = { ...opts, priority: sessionQueuePriority };
418+
return params.enqueue
419+
? params.enqueue(task, sessionOpts)
420+
: enqueueCommandInLane(sessionLane, task, sessionOpts);
421+
};
400422
const channelHint = params.messageChannel ?? params.messageProvider;
401423
const resolvedToolResultFormat =
402424
params.toolResultFormat ??

src/process/command-queue.test.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,64 @@ describe("command queue", () => {
161161
expect(getQueueSize()).toBe(0);
162162
});
163163

164+
it("runs foreground work before already queued background work", async () => {
165+
const { task: blocker, release } = enqueueBlockedMainTask(async () => "blocker");
166+
const calls: string[] = [];
167+
168+
const background = enqueueCommandInLane(
169+
CommandLane.Main,
170+
async () => {
171+
calls.push("background");
172+
return "background";
173+
},
174+
{ priority: "background" },
175+
);
176+
const normal = enqueueCommandInLane(CommandLane.Main, async () => {
177+
calls.push("normal");
178+
return "normal";
179+
});
180+
const foreground = enqueueCommandInLane(
181+
CommandLane.Main,
182+
async () => {
183+
calls.push("foreground");
184+
return "foreground";
185+
},
186+
{ priority: "foreground" },
187+
);
188+
189+
release();
190+
await expect(blocker).resolves.toBe("blocker");
191+
await expect(foreground).resolves.toBe("foreground");
192+
await expect(normal).resolves.toBe("normal");
193+
await expect(background).resolves.toBe("background");
194+
expect(calls).toEqual(["foreground", "normal", "background"]);
195+
});
196+
197+
it("preserves FIFO order within each priority", async () => {
198+
const { task: blocker, release } = enqueueBlockedMainTask(async () => "blocker");
199+
const calls: string[] = [];
200+
201+
const first = enqueueCommandInLane(
202+
CommandLane.Main,
203+
async () => {
204+
calls.push("first");
205+
},
206+
{ priority: "foreground" },
207+
);
208+
const second = enqueueCommandInLane(
209+
CommandLane.Main,
210+
async () => {
211+
calls.push("second");
212+
},
213+
{ priority: "foreground" },
214+
);
215+
216+
release();
217+
await blocker;
218+
await Promise.all([first, second]);
219+
expect(calls).toEqual(["first", "second"]);
220+
});
221+
164222
it("logs enqueue depth after push", async () => {
165223
const task = enqueueCommand(async () => {});
166224

src/process/command-queue.ts

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ type QueueEntry = {
6161
resolve: (value: unknown) => void;
6262
reject: (reason?: unknown) => void;
6363
enqueuedAt: number;
64+
sequence: number;
65+
priority: number;
6466
warnAfterMs: number;
6567
taskTimeoutMs?: number;
6668
taskTimeoutProgressAtMs?: () => number | undefined;
@@ -107,6 +109,7 @@ function getQueueState() {
107109
lanes: new Map<string, LaneState>(),
108110
activeTaskWaiters: new Set<ActiveTaskWaiter>(),
109111
nextTaskId: 1,
112+
nextQueueSequence: 1,
110113
}));
111114
// Schema migration: the singleton may have been created by an older code
112115
// version (e.g. v2026.4.2) that did not include `activeTaskWaiters`. After
@@ -117,6 +120,27 @@ function getQueueState() {
117120
if (!state.activeTaskWaiters) {
118121
state.activeTaskWaiters = new Set<ActiveTaskWaiter>();
119122
}
123+
if (!state.nextQueueSequence) {
124+
state.nextQueueSequence = 1;
125+
}
126+
let maxQueueSequence = state.nextQueueSequence - 1;
127+
for (const lane of state.lanes.values()) {
128+
for (const entry of lane.queue as Array<
129+
QueueEntry & { priority?: number; sequence?: number }
130+
>) {
131+
if (typeof entry.priority !== "number") {
132+
entry.priority = 0;
133+
}
134+
if (typeof entry.sequence !== "number") {
135+
entry.sequence = state.nextQueueSequence++;
136+
} else {
137+
maxQueueSequence = Math.max(maxQueueSequence, entry.sequence);
138+
}
139+
}
140+
}
141+
if (state.nextQueueSequence <= maxQueueSequence) {
142+
state.nextQueueSequence = maxQueueSequence + 1;
143+
}
120144
return state;
121145
}
122146

@@ -204,6 +228,30 @@ function normalizeTaskTimeoutMs(value: number | undefined): number | undefined {
204228
return Math.max(1, Math.floor(value));
205229
}
206230

231+
function resolveQueuePriority(priority: CommandQueueEnqueueOptions["priority"]): number {
232+
switch (priority) {
233+
case "foreground":
234+
return 1;
235+
case "background":
236+
return -1;
237+
default:
238+
return 0;
239+
}
240+
}
241+
242+
function enqueueLaneEntry(state: LaneState, entry: QueueEntry): void {
243+
const insertAt = state.queue.findIndex(
244+
(queued) =>
245+
queued.priority < entry.priority ||
246+
(queued.priority === entry.priority && queued.sequence > entry.sequence),
247+
);
248+
if (insertAt < 0) {
249+
state.queue.push(entry);
250+
return;
251+
}
252+
state.queue.splice(insertAt, 0, entry);
253+
}
254+
207255
async function runQueueEntryTask(lane: string, entry: QueueEntry): Promise<unknown> {
208256
const taskPromise = Promise.resolve().then(entry.task);
209257
const taskTimeoutMs = normalizeTaskTimeoutMs(entry.taskTimeoutMs);
@@ -362,11 +410,13 @@ export function enqueueCommandInLane<T>(
362410
const warnAfterMs = opts?.warnAfterMs ?? 2_000;
363411
const state = getLaneState(cleaned);
364412
return new Promise<T>((resolve, reject) => {
365-
state.queue.push({
413+
enqueueLaneEntry(state, {
366414
task: () => task(),
367415
resolve: (value) => resolve(value as T),
368416
reject,
369417
enqueuedAt: Date.now(),
418+
sequence: queueState.nextQueueSequence++,
419+
priority: resolveQueuePriority(opts?.priority),
370420
warnAfterMs,
371421
taskTimeoutMs: normalizeTaskTimeoutMs(opts?.taskTimeoutMs),
372422
taskTimeoutProgressAtMs: opts?.taskTimeoutProgressAtMs,
@@ -472,6 +522,7 @@ export function resetCommandQueueStateForTest(): void {
472522
resolveActiveTaskWaiter(waiter, { drained: true });
473523
}
474524
queueState.nextTaskId = 1;
525+
queueState.nextQueueSequence = 1;
475526
}
476527

477528
/**

src/process/command-queue.types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export type CommandQueueEnqueueOptions = {
33
onWait?: (waitMs: number, queuedAhead: number) => void;
44
taskTimeoutMs?: number;
55
taskTimeoutProgressAtMs?: () => number | undefined;
6+
priority?: "foreground" | "normal" | "background";
67
};
78

89
export type CommandQueueEnqueueFn = <T>(

0 commit comments

Comments
 (0)