Skip to content

Commit 25e8a81

Browse files
committed
refactor(auto-reply): simplify foreground freshness fence
1 parent fa67b7c commit 25e8a81

2 files changed

Lines changed: 26 additions & 36 deletions

File tree

src/auto-reply/dispatch.freshness.test.ts

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,10 @@ type Delivery = {
2828

2929
function createDeferred<T>() {
3030
let resolve!: (value: T | PromiseLike<T>) => void;
31-
let reject!: (reason?: unknown) => void;
32-
const promise = new Promise<T>((res, rej) => {
31+
const promise = new Promise<T>((res) => {
3332
resolve = res;
34-
reject = rej;
3533
});
36-
return { promise, resolve, reject };
34+
return { promise, resolve };
3735
}
3836

3937
function queuedFinalResult() {
@@ -88,7 +86,6 @@ describe("foreground reply freshness", () => {
8886
it("suppresses an older foreground final after a newer inbound turn starts for the same session target", async () => {
8987
const deliveries: Delivery[] = [];
9088
const olderStarted = createDeferred<void>();
91-
const newerStarted = createDeferred<void>();
9289
const releaseOlderFinal = createDeferred<void>();
9390

9491
hoisted.dispatchReplyFromConfigMock.mockImplementation(
@@ -100,7 +97,6 @@ describe("foreground reply freshness", () => {
10097
return queuedFinalResult();
10198
}
10299
if (params.ctx.MessageSid === "new-message") {
103-
newerStarted.resolve();
104100
params.dispatcher.sendFinalReply({ text: "new final" });
105101
return queuedFinalResult();
106102
}
@@ -114,12 +110,10 @@ describe("foreground reply freshness", () => {
114110
);
115111
await olderStarted.promise;
116112

117-
const newerDispatch = dispatchWithDeliveries(
113+
const newerResult = await dispatchWithDeliveries(
118114
buildForegroundCtx({ MessageSid: "new-message" }),
119115
deliveries,
120116
);
121-
await newerStarted.promise;
122-
const newerResult = await newerDispatch;
123117

124118
releaseOlderFinal.resolve();
125119
const olderResult = await olderDispatch;
@@ -139,10 +133,9 @@ describe("foreground reply freshness", () => {
139133
const deliveries: Delivery[] = [];
140134
const beforeDeliverStarted = createDeferred<void>();
141135
const releaseBeforeDeliver = createDeferred<ReplyPayload | null>();
142-
const newerStarted = createDeferred<void>();
143-
const beforeDeliver = vi.fn(async () => {
136+
const beforeDeliver = vi.fn(() => {
144137
beforeDeliverStarted.resolve();
145-
return await releaseBeforeDeliver.promise;
138+
return releaseBeforeDeliver.promise;
146139
});
147140

148141
hoisted.dispatchReplyFromConfigMock.mockImplementation(
@@ -152,7 +145,6 @@ describe("foreground reply freshness", () => {
152145
return queuedFinalResult();
153146
}
154147
if (params.ctx.MessageSid === "new-message") {
155-
newerStarted.resolve();
156148
return {
157149
queuedFinal: false,
158150
counts: { tool: 0, block: 0, final: 0 },
@@ -173,7 +165,6 @@ describe("foreground reply freshness", () => {
173165
buildForegroundCtx({ MessageSid: "new-message" }),
174166
deliveries,
175167
);
176-
await newerStarted.promise;
177168

178169
releaseBeforeDeliver.resolve({ text: "old rewritten final" });
179170
const olderResult = await olderDispatch;
@@ -193,7 +184,6 @@ describe("foreground reply freshness", () => {
193184
it("keeps concurrent foreground finals isolated for different targets sharing a session", async () => {
194185
const deliveries: Delivery[] = [];
195186
const firstStarted = createDeferred<void>();
196-
const secondStarted = createDeferred<void>();
197187
const releaseFirstFinal = createDeferred<void>();
198188

199189
hoisted.dispatchReplyFromConfigMock.mockImplementation(
@@ -205,7 +195,6 @@ describe("foreground reply freshness", () => {
205195
return queuedFinalResult();
206196
}
207197
if (params.ctx.MessageSid === "second-chat") {
208-
secondStarted.resolve();
209198
params.dispatcher.sendFinalReply({ text: "second chat final" });
210199
return queuedFinalResult();
211200
}
@@ -234,7 +223,6 @@ describe("foreground reply freshness", () => {
234223
}),
235224
deliveries,
236225
);
237-
await secondStarted.promise;
238226
await expect(secondDispatch).resolves.toEqual({
239227
queuedFinal: true,
240228
counts: { tool: 0, block: 0, final: 1 },

src/auto-reply/dispatch.ts

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,13 @@ function beginForegroundReplyFence(
9292
};
9393
}
9494

95-
function isForegroundReplyFenceSuperseded(snapshot: ForegroundReplyFenceSnapshot): boolean {
96-
return (foregroundReplyFenceByKey.get(snapshot.key)?.generation ?? 0) !== snapshot.generation;
95+
function isForegroundReplyFenceSuperseded(
96+
snapshot: ForegroundReplyFenceSnapshot | undefined,
97+
): boolean {
98+
return Boolean(
99+
snapshot &&
100+
(foregroundReplyFenceByKey.get(snapshot.key)?.generation ?? 0) !== snapshot.generation,
101+
);
97102
}
98103

99104
function endForegroundReplyFence(snapshot: ForegroundReplyFenceSnapshot): void {
@@ -283,26 +288,24 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: {
283288
}): Promise<DispatchInboundResult> {
284289
const finalized = finalizeInboundContext(params.ctx);
285290
const foregroundReplyFence = beginForegroundReplyFence(finalized);
286-
let foregroundReplyFenceActive = true;
287291
const silentReplyContext = resolveDispatcherSilentReplyContext(finalized, params.cfg);
288292
const configuredBeforeDeliver =
289293
params.dispatcherOptions.beforeDeliver ?? buildMessageSendingBeforeDeliver(finalized);
290-
const beforeDeliver: ReplyDispatchBeforeDeliver | undefined = foregroundReplyFence
291-
? async (payload, info) => {
292-
const isSuperseded = () =>
293-
foregroundReplyFenceActive && isForegroundReplyFenceSuperseded(foregroundReplyFence);
294-
if (isSuperseded()) {
295-
return null;
296-
}
297-
const deliverPayload = configuredBeforeDeliver
298-
? await configuredBeforeDeliver(payload, info)
299-
: payload;
300-
if (!deliverPayload || isSuperseded()) {
301-
return null;
294+
const beforeDeliver: ReplyDispatchBeforeDeliver | undefined =
295+
foregroundReplyFence || configuredBeforeDeliver
296+
? async (payload, info) => {
297+
if (isForegroundReplyFenceSuperseded(foregroundReplyFence)) {
298+
return null;
299+
}
300+
const deliverPayload = configuredBeforeDeliver
301+
? await configuredBeforeDeliver(payload, info)
302+
: payload;
303+
if (!deliverPayload || isForegroundReplyFenceSuperseded(foregroundReplyFence)) {
304+
return null;
305+
}
306+
return deliverPayload;
302307
}
303-
return deliverPayload;
304-
}
305-
: configuredBeforeDeliver;
308+
: undefined;
306309
const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } =
307310
createReplyDispatcherWithTyping({
308311
...params.dispatcherOptions,
@@ -322,7 +325,6 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: {
322325
});
323326
} finally {
324327
if (foregroundReplyFence) {
325-
foregroundReplyFenceActive = false;
326328
endForegroundReplyFence(foregroundReplyFence);
327329
}
328330
markRunComplete();

0 commit comments

Comments
 (0)