Skip to content

Commit f57c978

Browse files
authored
Merge ede2b5d into 7a602c7
2 parents 7a602c7 + ede2b5d commit f57c978

3 files changed

Lines changed: 65 additions & 2 deletions

File tree

src/auto-reply/reply/queue/drain.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
drainCollectQueueStep,
1010
drainNextQueueItem,
1111
hasCrossChannelItems,
12+
removeQueuedItemsByRef,
1213
previewQueueSummaryPrompt,
1314
waitForQueueDebounce,
1415
} from "../../../utils/queue-helpers.js";
@@ -497,7 +498,7 @@ export function scheduleFollowupDrain(
497498
} else {
498499
await drainGroup();
499500
}
500-
queue.items.splice(0, groupItems.length);
501+
removeQueuedItemsByRef(queue.items, groupItems);
501502
if (pendingSummary) {
502503
clearFollowupQueueSummaryState(queue);
503504
pendingSummary = undefined;

src/utils/queue-helpers.test.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
// Queue helper tests cover queue ordering and dedupe utility behavior.
22
import { describe, expect, it } from "vitest";
33
import {
4+
applyQueueDropPolicy,
45
applyQueueRuntimeSettings,
56
buildQueueSummaryPrompt,
67
clearQueueSummaryState,
78
drainCollectItemIfNeeded,
9+
drainNextQueueItem,
810
hasCrossChannelItems,
911
previewQueueSummaryPrompt,
1012
} from "./queue-helpers.js";
@@ -170,6 +172,57 @@ describe("drainCollectItemIfNeeded", () => {
170172
});
171173
});
172174

175+
describe("drainNextQueueItem", () => {
176+
it("keeps overflow survivors when the queue mutates during an awaited drain", async () => {
177+
type Item = { id: string };
178+
const queue = {
179+
items: [{ id: "m1" }],
180+
cap: 3,
181+
dropPolicy: "summarize" as const,
182+
droppedCount: 0,
183+
summaryLines: [],
184+
};
185+
const delivered: string[] = [];
186+
const dropped: string[] = [];
187+
let release!: () => void;
188+
const gate = new Promise<void>((resolve) => {
189+
release = resolve;
190+
});
191+
192+
const firstDrain = drainNextQueueItem(queue.items, async (item: Item) => {
193+
delivered.push(item.id);
194+
await gate;
195+
});
196+
await Promise.resolve();
197+
198+
for (let index = 2; index <= 8; index += 1) {
199+
const item = { id: `m${index}` };
200+
const shouldEnqueue = applyQueueDropPolicy({
201+
queue,
202+
summarize: (queued) => queued.id,
203+
onDrop: (items) => {
204+
dropped.push(...items.map((queued) => queued.id));
205+
},
206+
});
207+
if (shouldEnqueue) {
208+
queue.items.push(item);
209+
}
210+
}
211+
212+
release();
213+
await firstDrain;
214+
while (
215+
await drainNextQueueItem(queue.items, async (item) => {
216+
delivered.push(item.id);
217+
})
218+
) {}
219+
220+
expect(delivered).toEqual(["m1", "m6", "m7", "m8"]);
221+
expect(dropped).toEqual(["m1", "m2", "m3", "m4", "m5"]);
222+
expect(queue.items).toEqual([]);
223+
});
224+
});
225+
173226
describe("hasCrossChannelItems", () => {
174227
it("lets unresolved items join an otherwise single keyed route", () => {
175228
const items = [

src/utils/queue-helpers.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,15 @@ export function beginQueueDrain<T extends { draining: boolean }>(
166166
return queue;
167167
}
168168

169+
export function removeQueuedItemsByRef<T>(items: T[], processed: readonly T[]): void {
170+
for (const item of processed) {
171+
const idx = items.indexOf(item);
172+
if (idx !== -1) {
173+
items.splice(idx, 1);
174+
}
175+
}
176+
}
177+
169178
/** Run and remove the next queued item, returning false when empty. */
170179
export async function drainNextQueueItem<T>(
171180
items: T[],
@@ -176,7 +185,7 @@ export async function drainNextQueueItem<T>(
176185
return false;
177186
}
178187
await run(next);
179-
items.shift();
188+
removeQueuedItemsByRef(items, [next]);
180189
return true;
181190
}
182191

0 commit comments

Comments
 (0)