Skip to content

Commit dd99f43

Browse files
committed
fix(telegram): recover stalled isolated spool handlers
1 parent 71ed652 commit dd99f43

9 files changed

Lines changed: 1082 additions & 208 deletions

extensions/telegram/src/bot-message-dispatch.test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2108,6 +2108,88 @@ describe("dispatchTelegramMessage draft streaming", () => {
21082108
expect(deliveredTexts).not.toContain("stale ambient answer");
21092109
});
21102110

2111+
it("lets newer user requests abort active same-session dispatch", async () => {
2112+
const historyKey = "telegram:group:-100123";
2113+
const groupHistories = new Map([[historyKey, []]]);
2114+
let firstStarted: (() => void) | undefined;
2115+
const firstStartGate = new Promise<void>((resolve) => {
2116+
firstStarted = resolve;
2117+
});
2118+
let releaseFirst: (() => void) | undefined;
2119+
const firstGate = new Promise<void>((resolve) => {
2120+
releaseFirst = resolve;
2121+
});
2122+
let secondStarted: (() => void) | undefined;
2123+
const secondStartGate = new Promise<void>((resolve) => {
2124+
secondStarted = resolve;
2125+
});
2126+
let firstAbortSignal: AbortSignal | undefined;
2127+
dispatchReplyWithBufferedBlockDispatcher
2128+
.mockImplementationOnce(async ({ replyOptions }) => {
2129+
firstAbortSignal = replyOptions?.abortSignal;
2130+
firstStarted?.();
2131+
await firstGate;
2132+
return {
2133+
queuedFinal: false,
2134+
counts: { block: 0, final: 0, tool: 0 },
2135+
};
2136+
})
2137+
.mockImplementationOnce(async ({ dispatcherOptions }) => {
2138+
secondStarted?.();
2139+
await dispatcherOptions.deliver({ text: "fresh request answer" }, { kind: "final" });
2140+
return {
2141+
queuedFinal: true,
2142+
counts: { block: 0, final: 1, tool: 0 },
2143+
};
2144+
});
2145+
deliverReplies.mockResolvedValue({ delivered: true });
2146+
2147+
const createGroupContext = (messageId: number, body: string) =>
2148+
createContext({
2149+
ctxPayload: {
2150+
SessionKey: "agent:main:telegram:group:-100123",
2151+
ChatType: "group",
2152+
MessageSid: String(messageId),
2153+
RawBody: body,
2154+
BodyForAgent: body,
2155+
CommandBody: body,
2156+
CommandAuthorized: true,
2157+
} as unknown as TelegramMessageContext["ctxPayload"],
2158+
msg: {
2159+
chat: { id: -100123, type: "supergroup" },
2160+
message_id: messageId,
2161+
} as unknown as TelegramMessageContext["msg"],
2162+
chatId: -100123,
2163+
isGroup: true,
2164+
historyKey,
2165+
historyLimit: 10,
2166+
groupHistories,
2167+
threadSpec: { id: undefined, scope: "none" },
2168+
});
2169+
2170+
const firstPromise = dispatchWithContext({
2171+
context: createGroupContext(99, "@bot first request"),
2172+
streamMode: "off",
2173+
});
2174+
await firstStartGate;
2175+
const secondPromise = dispatchWithContext({
2176+
context: createGroupContext(100, "@bot second request"),
2177+
streamMode: "off",
2178+
});
2179+
await secondStartGate;
2180+
2181+
expect(firstAbortSignal?.aborted).toBe(true);
2182+
releaseFirst?.();
2183+
await Promise.all([firstPromise, secondPromise]);
2184+
2185+
const deliveredTexts = deliverReplies.mock.calls.flatMap((call) =>
2186+
((call[0] as { replies?: Array<{ text?: string }> }).replies ?? []).map(
2187+
(reply) => reply.text,
2188+
),
2189+
);
2190+
expect(deliveredTexts).toContain("fresh request answer");
2191+
});
2192+
21112193
it("keeps queued room events abortable after their source dispatch returns", async () => {
21122194
const historyKey = "telegram:group:-100123";
21132195
const groupHistories = new Map([[historyKey, []]]);

extensions/telegram/src/bot-message-dispatch.ts

Lines changed: 42 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import {
2828
resolveChannelStreamingPreviewToolProgress,
2929
resolveTranscriptBackedChannelFinalText,
3030
} from "openclaw/plugin-sdk/channel-streaming";
31-
import { isAbortRequestText } from "openclaw/plugin-sdk/command-primitives-runtime";
3231
import type {
3332
OpenClawConfig,
3433
ReplyToMode,
@@ -107,9 +106,23 @@ import {
107106
splitTelegramReasoningText,
108107
} from "./reasoning-lane-coordinator.js";
109108
import { editMessageTelegram } from "./send.js";
109+
import { getTelegramSequentialKey } from "./sequential-key.js";
110110
import { cacheSticker, describeStickerImage } from "./sticker-cache.js";
111+
import {
112+
beginTelegramReplyFence,
113+
buildTelegramReplyFenceLaneKey,
114+
endTelegramReplyFence,
115+
getTelegramReplyFenceSizeForTests,
116+
isTelegramReplyFenceSuperseded,
117+
releaseTelegramReplyFenceAbortController,
118+
resetTelegramReplyFenceForTests,
119+
resolveTelegramReplyFenceKey,
120+
shouldSupersedeTelegramReplyFence,
121+
supersedeTelegramReplyFence,
122+
} from "./telegram-reply-fence.js";
111123

112124
export { pruneStickerMediaFromContext } from "./bot-message-dispatch.media.js";
125+
export { getTelegramReplyFenceSizeForTests, resetTelegramReplyFenceForTests };
113126

114127
const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
115128
const silentReplyDispatchLogger = createSubsystemLogger("telegram/silent-reply-dispatch");
@@ -180,140 +193,6 @@ type TelegramReasoningLevel = "off" | "on" | "stream";
180193

181194
type TelegramTranscriptMirrorPayload = { text?: string; mediaUrls?: string[] };
182195

183-
type TelegramReplyFenceState = {
184-
generation: number;
185-
activeDispatches: number;
186-
abortControllers?: Set<AbortController>;
187-
};
188-
189-
type TelegramReplyFenceKey = {
190-
activeKey: string;
191-
roomEventKey: string;
192-
};
193-
194-
// Newer accepted turns and authorized aborts can arrive ahead of older same-session reply work.
195-
const telegramReplyFenceByKey = new Map<string, TelegramReplyFenceState>();
196-
197-
function normalizeTelegramFenceKey(value: unknown): string | undefined {
198-
if (typeof value !== "string") {
199-
return undefined;
200-
}
201-
const trimmed = value.trim();
202-
return trimmed.length > 0 ? trimmed : undefined;
203-
}
204-
205-
function resolveTelegramReplyFenceKey(params: {
206-
ctxPayload: { SessionKey?: string; CommandTargetSessionKey?: string; InboundEventKind?: string };
207-
chatId: number | string;
208-
threadSpec: { id?: number | string | null; scope?: string };
209-
}): TelegramReplyFenceKey {
210-
const baseKey =
211-
normalizeTelegramFenceKey(params.ctxPayload.CommandTargetSessionKey) ??
212-
normalizeTelegramFenceKey(params.ctxPayload.SessionKey) ??
213-
`telegram:${String(params.chatId)}:${params.threadSpec.scope ?? "default"}:${params.threadSpec.id ?? "root"}`;
214-
const roomEventKey = `${baseKey}:room_event`;
215-
return {
216-
activeKey: params.ctxPayload.InboundEventKind === "room_event" ? roomEventKey : baseKey,
217-
roomEventKey,
218-
};
219-
}
220-
221-
function abortTelegramReplyFenceControllers(state: TelegramReplyFenceState): void {
222-
for (const controller of state.abortControllers ?? []) {
223-
controller.abort();
224-
}
225-
state.abortControllers?.clear();
226-
}
227-
228-
function beginTelegramReplyFence(params: {
229-
key: string;
230-
supersede: boolean;
231-
abortController?: AbortController;
232-
}): number {
233-
const existing = telegramReplyFenceByKey.get(params.key);
234-
const state: TelegramReplyFenceState = existing ?? {
235-
generation: 0,
236-
activeDispatches: 0,
237-
};
238-
if (params.supersede) {
239-
state.generation += 1;
240-
abortTelegramReplyFenceControllers(state);
241-
}
242-
if (params.abortController) {
243-
(state.abortControllers ??= new Set()).add(params.abortController);
244-
}
245-
state.activeDispatches += 1;
246-
telegramReplyFenceByKey.set(params.key, state);
247-
return state.generation;
248-
}
249-
250-
function supersedeTelegramReplyFence(key: string): void {
251-
const state = telegramReplyFenceByKey.get(key);
252-
if (!state) {
253-
return;
254-
}
255-
state.generation += 1;
256-
abortTelegramReplyFenceControllers(state);
257-
if (state.activeDispatches <= 0 && (state.abortControllers?.size ?? 0) === 0) {
258-
telegramReplyFenceByKey.delete(key);
259-
} else {
260-
telegramReplyFenceByKey.set(key, state);
261-
}
262-
}
263-
264-
function isTelegramReplyFenceSuperseded(params: { key: string; generation: number }): boolean {
265-
return (telegramReplyFenceByKey.get(params.key)?.generation ?? 0) !== params.generation;
266-
}
267-
268-
function endTelegramReplyFence(key: string, abortController?: AbortController): void {
269-
const state = telegramReplyFenceByKey.get(key);
270-
if (!state) {
271-
return;
272-
}
273-
if (abortController) {
274-
state.abortControllers?.delete(abortController);
275-
}
276-
state.activeDispatches = Math.max(0, state.activeDispatches - 1);
277-
if (state.activeDispatches <= 0 && (state.abortControllers?.size ?? 0) === 0) {
278-
telegramReplyFenceByKey.delete(key);
279-
}
280-
}
281-
282-
function releaseTelegramReplyFenceAbortController(
283-
key: string,
284-
abortController?: AbortController,
285-
): void {
286-
if (!abortController) {
287-
return;
288-
}
289-
const state = telegramReplyFenceByKey.get(key);
290-
if (!state) {
291-
return;
292-
}
293-
state.abortControllers?.delete(abortController);
294-
if (state.activeDispatches <= 0 && (state.abortControllers?.size ?? 0) === 0) {
295-
telegramReplyFenceByKey.delete(key);
296-
}
297-
}
298-
299-
function shouldSupersedeTelegramReplyFence(ctxPayload: {
300-
Body?: string;
301-
RawBody?: string;
302-
CommandBody?: string;
303-
CommandAuthorized: boolean;
304-
}): boolean {
305-
const dispatchText = ctxPayload.CommandBody ?? ctxPayload.RawBody ?? ctxPayload.Body ?? "";
306-
return !isAbortRequestText(dispatchText) || ctxPayload.CommandAuthorized;
307-
}
308-
309-
export function getTelegramReplyFenceSizeForTests(): number {
310-
return telegramReplyFenceByKey.size;
311-
}
312-
313-
export function resetTelegramReplyFenceForTests(): void {
314-
telegramReplyFenceByKey.clear();
315-
}
316-
317196
function resolveTelegramReasoningLevel(params: {
318197
cfg: OpenClawConfig;
319198
sessionKey?: string;
@@ -531,9 +410,17 @@ export const dispatchTelegramMessage = async ({
531410
chatId,
532411
threadSpec,
533412
});
413+
const replyFenceLaneKey = getTelegramSequentialKey({
414+
message: msg,
415+
...(context.primaryCtx.me ? { me: context.primaryCtx.me } : {}),
416+
});
417+
const scopedReplyFenceLaneKey = buildTelegramReplyFenceLaneKey({
418+
accountId: route.accountId,
419+
sequentialKey: replyFenceLaneKey,
420+
});
534421
let replyFenceGeneration: number | undefined;
535-
const roomEventAbortController = isRoomEvent ? new AbortController() : undefined;
536-
let roomEventAbortControllerQueued = false;
422+
const replyAbortController = new AbortController();
423+
let replyAbortControllerQueued = false;
537424
let dispatchWasSuperseded = false;
538425
const isDispatchSuperseded = () =>
539426
replyFenceGeneration !== undefined &&
@@ -547,7 +434,7 @@ export const dispatchTelegramMessage = async ({
547434
}
548435
endTelegramReplyFence(
549436
replyFenceKey.activeKey,
550-
roomEventAbortControllerQueued ? undefined : roomEventAbortController,
437+
replyAbortControllerQueued ? undefined : replyAbortController,
551438
);
552439
replyFenceGeneration = undefined;
553440
};
@@ -940,7 +827,8 @@ export const dispatchTelegramMessage = async ({
940827
replyFenceGeneration = beginTelegramReplyFence({
941828
key: replyFenceKey.activeKey,
942829
supersede: supersedeReplyFence,
943-
abortController: roomEventAbortController,
830+
abortController: replyAbortController,
831+
laneKey: scopedReplyFenceLaneKey,
944832
});
945833

946834
const implicitQuoteReplyTargetId =
@@ -1567,26 +1455,25 @@ export const dispatchTelegramMessage = async ({
15671455
replyOptions: {
15681456
skillFilter,
15691457
disableBlockStreaming,
1570-
abortSignal: roomEventAbortController?.signal,
1458+
abortSignal: replyAbortController.signal,
15711459
sourceReplyDeliveryMode: isRoomEvent ? "message_tool_only" : undefined,
15721460
queuedDeliveryCorrelations: isRoomEvent
15731461
? [{ begin: beginDeliveryCorrelation }]
15741462
: undefined,
1575-
queuedFollowupLifecycle:
1576-
isRoomEvent && roomEventAbortController
1577-
? {
1578-
onEnqueued: () => {
1579-
roomEventAbortControllerQueued = true;
1580-
},
1581-
onComplete: () => {
1582-
roomEventAbortControllerQueued = false;
1583-
releaseTelegramReplyFenceAbortController(
1584-
replyFenceKey.activeKey,
1585-
roomEventAbortController,
1586-
);
1587-
},
1588-
}
1589-
: undefined,
1463+
queuedFollowupLifecycle: isRoomEvent
1464+
? {
1465+
onEnqueued: () => {
1466+
replyAbortControllerQueued = true;
1467+
},
1468+
onComplete: () => {
1469+
replyAbortControllerQueued = false;
1470+
releaseTelegramReplyFenceAbortController(
1471+
replyFenceKey.activeKey,
1472+
replyAbortController,
1473+
);
1474+
},
1475+
}
1476+
: undefined,
15901477
suppressTyping: isRoomEvent,
15911478
onPartialReply:
15921479
answerLane.stream || reasoningLane.stream

0 commit comments

Comments
 (0)