Skip to content

Commit 25aa72e

Browse files
committed
fix(telegram): stop noninterrupting reply fences
1 parent 1ba9f5d commit 25aa72e

3 files changed

Lines changed: 122 additions & 4 deletions

File tree

extensions/telegram/src/bot-message-dispatch.test.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2124,7 +2124,6 @@ describe("dispatchTelegramMessage draft streaming", () => {
21242124
secondStarted = resolve;
21252125
});
21262126
let firstAbortSignal: AbortSignal | undefined;
2127-
let sideAbortSignal: AbortSignal | undefined;
21282127
dispatchReplyWithBufferedBlockDispatcher
21292128
.mockImplementationOnce(async ({ replyOptions }) => {
21302129
firstAbortSignal = replyOptions?.abortSignal;
@@ -2283,6 +2282,70 @@ describe("dispatchTelegramMessage draft streaming", () => {
22832282
await Promise.all([firstPromise, sidePromise]);
22842283
});
22852284

2285+
it("lets authorized /stop abort active non-interrupting side dispatch", async () => {
2286+
const historyKey = "telegram:group:-100123";
2287+
const groupHistories = new Map([[historyKey, []]]);
2288+
let sideStarted: (() => void) | undefined;
2289+
const sideStartGate = new Promise<void>((resolve) => {
2290+
sideStarted = resolve;
2291+
});
2292+
let releaseSide: (() => void) | undefined;
2293+
const sideGate = new Promise<void>((resolve) => {
2294+
releaseSide = resolve;
2295+
});
2296+
let sideAbortSignal: AbortSignal | undefined;
2297+
dispatchReplyWithBufferedBlockDispatcher.mockImplementationOnce(async ({ replyOptions }) => {
2298+
sideAbortSignal = replyOptions?.abortSignal;
2299+
sideStarted?.();
2300+
await sideGate;
2301+
return {
2302+
queuedFinal: false,
2303+
counts: { block: 0, final: 0, tool: 0 },
2304+
};
2305+
});
2306+
deliverReplies.mockResolvedValue({ delivered: true });
2307+
2308+
const createGroupContext = (messageId: number, body: string) =>
2309+
createContext({
2310+
ctxPayload: {
2311+
SessionKey: "agent:main:telegram:group:-100123",
2312+
ChatType: "group",
2313+
MessageSid: String(messageId),
2314+
RawBody: body,
2315+
BodyForAgent: body,
2316+
CommandBody: body,
2317+
CommandAuthorized: true,
2318+
} as unknown as TelegramMessageContext["ctxPayload"],
2319+
msg: {
2320+
chat: { id: -100123, type: "supergroup" },
2321+
message_id: messageId,
2322+
text: body,
2323+
} as unknown as TelegramMessageContext["msg"],
2324+
chatId: -100123,
2325+
isGroup: true,
2326+
historyKey,
2327+
historyLimit: 10,
2328+
groupHistories,
2329+
threadSpec: { id: undefined, scope: "none" },
2330+
});
2331+
2332+
const sidePromise = dispatchWithContext({
2333+
context: createGroupContext(100, "/btw what changed?"),
2334+
streamMode: "off",
2335+
});
2336+
await sideStartGate;
2337+
expect(sideAbortSignal?.aborted).toBe(false);
2338+
2339+
await dispatchWithContext({
2340+
context: createGroupContext(101, "/stop"),
2341+
streamMode: "off",
2342+
});
2343+
2344+
expect(sideAbortSignal?.aborted).toBe(true);
2345+
releaseSide?.();
2346+
await sidePromise;
2347+
});
2348+
22862349
it("keeps queued room events abortable after their source dispatch returns", async () => {
22872350
const historyKey = "telegram:group:-100123";
22882351
const groupHistories = new Map([[historyKey, []]]);

extensions/telegram/src/telegram-reply-fence.test.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import { describe, expect, it } from "vitest";
2-
import { shouldSupersedeTelegramReplyFence } from "./telegram-reply-fence.js";
2+
import {
3+
beginTelegramReplyFence,
4+
buildTelegramNonInterruptingReplyFenceKey,
5+
resetTelegramReplyFenceForTests,
6+
shouldSupersedeTelegramReplyFence,
7+
supersedeTelegramReplyFence,
8+
} from "./telegram-reply-fence.js";
39

410
describe("shouldSupersedeTelegramReplyFence", () => {
511
it("keeps non-interrupting side and status commands from superseding active runs", () => {
@@ -44,3 +50,30 @@ describe("shouldSupersedeTelegramReplyFence", () => {
4450
).toBe(true);
4551
});
4652
});
53+
54+
describe("telegram reply fence supersede", () => {
55+
it("cascades base supersedes to non-interrupting child fences", () => {
56+
resetTelegramReplyFenceForTests();
57+
const activeKey = "agent:main:telegram:group:-100123";
58+
const sideController = new AbortController();
59+
const mainController = new AbortController();
60+
beginTelegramReplyFence({
61+
key: activeKey,
62+
supersede: true,
63+
abortController: mainController,
64+
});
65+
beginTelegramReplyFence({
66+
key: buildTelegramNonInterruptingReplyFenceKey({
67+
activeKey,
68+
laneKey: "default\0telegram:-100123:btw:100",
69+
}),
70+
supersede: false,
71+
abortController: sideController,
72+
});
73+
74+
expect(supersedeTelegramReplyFence(activeKey)).toBe(true);
75+
expect(mainController.signal.aborted).toBe(true);
76+
expect(sideController.signal.aborted).toBe(true);
77+
resetTelegramReplyFenceForTests();
78+
});
79+
});

extensions/telegram/src/telegram-reply-fence.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@ export function buildTelegramNonInterruptingReplyFenceKey(params: {
3131
activeKey: string;
3232
laneKey: string;
3333
}): string {
34-
return `${params.activeKey}\0non-interrupting\0${params.laneKey}`;
34+
return `${buildTelegramNonInterruptingReplyFenceKeyPrefix(params.activeKey)}${params.laneKey}`;
35+
}
36+
37+
function buildTelegramNonInterruptingReplyFenceKeyPrefix(activeKey: string): string {
38+
return `${activeKey}\0non-interrupting\0`;
3539
}
3640

3741
function normalizeTelegramFenceKey(value: unknown): string | undefined {
@@ -98,6 +102,7 @@ export function beginTelegramReplyFence(params: {
98102
if (params.supersede) {
99103
state.generation += 1;
100104
abortTelegramReplyFenceControllers(state);
105+
supersedeTelegramNonInterruptingReplyFenceChildren(params.key);
101106
}
102107
if (params.abortController) {
103108
(state.abortControllers ??= new Set()).add(params.abortController);
@@ -114,7 +119,7 @@ export function beginTelegramReplyFence(params: {
114119
return state.generation;
115120
}
116121

117-
export function supersedeTelegramReplyFence(key: string): boolean {
122+
function supersedeTelegramReplyFenceState(key: string): boolean {
118123
const state = telegramReplyFenceByKey.get(key);
119124
if (!state) {
120125
return false;
@@ -125,6 +130,23 @@ export function supersedeTelegramReplyFence(key: string): boolean {
125130
return true;
126131
}
127132

133+
function supersedeTelegramNonInterruptingReplyFenceChildren(key: string): boolean {
134+
let superseded = false;
135+
const childPrefix = buildTelegramNonInterruptingReplyFenceKeyPrefix(key);
136+
for (const childKey of [...telegramReplyFenceByKey.keys()]) {
137+
if (childKey.startsWith(childPrefix)) {
138+
superseded = supersedeTelegramReplyFenceState(childKey) || superseded;
139+
}
140+
}
141+
return superseded;
142+
}
143+
144+
export function supersedeTelegramReplyFence(key: string): boolean {
145+
let superseded = supersedeTelegramReplyFenceState(key);
146+
superseded = supersedeTelegramNonInterruptingReplyFenceChildren(key) || superseded;
147+
return superseded;
148+
}
149+
128150
export function supersedeTelegramReplyFenceLane(laneKey: string): boolean {
129151
const keys = [...(telegramReplyFenceKeysByLane.get(laneKey) ?? [])];
130152
let superseded = false;

0 commit comments

Comments
 (0)