Skip to content

Commit 1d301f7

Browse files
committed
refactor: extract telegram polling session
1 parent 2e79d82 commit 1d301f7

2 files changed

Lines changed: 307 additions & 279 deletions

File tree

src/telegram/monitor.ts

Lines changed: 24 additions & 279 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
1-
import { type RunOptions, run } from "@grammyjs/runner";
1+
import type { RunOptions } from "@grammyjs/runner";
22
import { resolveAgentMaxConcurrent } from "../config/agent-limits.js";
33
import type { OpenClawConfig } from "../config/config.js";
44
import { loadConfig } from "../config/config.js";
55
import { waitForAbortSignal } from "../infra/abort-signal.js";
6-
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
76
import { formatErrorMessage } from "../infra/errors.js";
8-
import { formatDurationPrecise } from "../infra/format-time/format-duration.ts";
97
import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js";
108
import type { RuntimeEnv } from "../runtime.js";
119
import { resolveTelegramAccount } from "./accounts.js";
1210
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
13-
import { withTelegramApiErrorLogging } from "./api-logging.js";
14-
import { createTelegramBot } from "./bot.js";
1511
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
12+
import { TelegramPollingSession } from "./polling-session.js";
1613
import { makeProxyFetch } from "./proxy.js";
1714
import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js";
1815
import { startTelegramWebhook } from "./webhook.js";
@@ -55,21 +52,6 @@ export function createTelegramRunnerOptions(cfg: OpenClawConfig): RunOptions<unk
5552
};
5653
}
5754

58-
const TELEGRAM_POLL_RESTART_POLICY = {
59-
initialMs: 2000,
60-
maxMs: 30_000,
61-
factor: 1.8,
62-
jitter: 0.25,
63-
};
64-
65-
// Polling stall detection: if no getUpdates call is seen for this long,
66-
// assume the runner is stuck and force-restart it.
67-
// Default fetch timeout is 30s, so 3x gives ample margin for slow responses.
68-
const POLL_STALL_THRESHOLD_MS = 90_000;
69-
const POLL_WATCHDOG_INTERVAL_MS = 30_000;
70-
71-
type TelegramBot = ReturnType<typeof createTelegramBot>;
72-
7355
function normalizePersistedUpdateId(value: number | null): number | null {
7456
if (value === null) {
7557
return null;
@@ -80,28 +62,6 @@ function normalizePersistedUpdateId(value: number | null): number | null {
8062
return value;
8163
}
8264

83-
const isGetUpdatesConflict = (err: unknown) => {
84-
if (!err || typeof err !== "object") {
85-
return false;
86-
}
87-
const typed = err as {
88-
error_code?: number;
89-
errorCode?: number;
90-
description?: string;
91-
method?: string;
92-
message?: string;
93-
};
94-
const errorCode = typed.error_code ?? typed.errorCode;
95-
if (errorCode !== 409) {
96-
return false;
97-
}
98-
const haystack = [typed.method, typed.description, typed.message]
99-
.filter((value): value is string => typeof value === "string")
100-
.join(" ")
101-
.toLowerCase();
102-
return haystack.includes("getupdates");
103-
};
104-
10565
/** Check if error is a Grammy HttpError (used to scope unhandled rejection handling) */
10666
const isGrammyHttpError = (err: unknown): boolean => {
10767
if (!err || typeof err !== "object") {
@@ -112,31 +72,26 @@ const isGrammyHttpError = (err: unknown): boolean => {
11272

11373
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
11474
const log = opts.runtime?.error ?? console.error;
115-
let activeRunner: ReturnType<typeof run> | undefined;
116-
let activeFetchAbort: AbortController | undefined;
117-
let forceRestarted = false;
75+
let pollingSession: TelegramPollingSession | undefined;
11876

119-
// Register handler for Grammy HttpError unhandled rejections.
120-
// This catches network errors that escape the polling loop's try-catch
121-
// (e.g., from setMyCommands during bot setup).
122-
// We gate on isGrammyHttpError to avoid suppressing non-Telegram errors.
12377
const unregisterHandler = registerUnhandledRejectionHandler((err) => {
12478
const isNetworkError = isRecoverableTelegramNetworkError(err, { context: "polling" });
12579
if (isGrammyHttpError(err) && isNetworkError) {
12680
log(`[telegram] Suppressed network error: ${formatErrorMessage(err)}`);
127-
return true; // handled - don't crash
81+
return true;
12882
}
129-
// Network failures can surface outside the runner task promise and leave
130-
// polling stuck; force-stop the active runner so the loop can recover.
83+
84+
const activeRunner = pollingSession?.activeRunner;
13185
if (isNetworkError && activeRunner && activeRunner.isRunning()) {
132-
forceRestarted = true;
133-
activeFetchAbort?.abort();
86+
pollingSession?.markForceRestarted();
87+
pollingSession?.abortActiveFetch();
13488
void activeRunner.stop().catch(() => {});
13589
log(
13690
`[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`,
13791
);
138-
return true; // handled
92+
return true;
13993
}
94+
14095
return false;
14196
});
14297

@@ -166,6 +121,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
166121
`[telegram] Ignoring invalid persisted update offset (${String(persistedOffsetRaw)}); starting without offset confirmation.`,
167122
);
168123
}
124+
169125
const persistUpdateId = async (updateId: number) => {
170126
const normalizedUpdateId = normalizePersistedUpdateId(updateId);
171127
if (normalizedUpdateId === null) {
@@ -208,230 +164,19 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
208164
return;
209165
}
210166

211-
// Use grammyjs/runner for concurrent update processing
212-
let restartAttempts = 0;
213-
let webhookCleared = false;
214-
const runnerOptions = createTelegramRunnerOptions(cfg);
215-
const waitBeforeRestart = async (buildLine: (delay: string) => string): Promise<boolean> => {
216-
restartAttempts += 1;
217-
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
218-
const delay = formatDurationPrecise(delayMs);
219-
log(buildLine(delay));
220-
try {
221-
await sleepWithAbort(delayMs, opts.abortSignal);
222-
} catch (sleepErr) {
223-
if (opts.abortSignal?.aborted) {
224-
return false;
225-
}
226-
throw sleepErr;
227-
}
228-
return true;
229-
};
230-
231-
const waitBeforeRetryOnRecoverableSetupError = async (
232-
err: unknown,
233-
logPrefix: string,
234-
): Promise<boolean> => {
235-
if (opts.abortSignal?.aborted) {
236-
return false;
237-
}
238-
if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) {
239-
throw err;
240-
}
241-
return waitBeforeRestart(
242-
(delay) => `${logPrefix}: ${formatErrorMessage(err)}; retrying in ${delay}.`,
243-
);
244-
};
245-
246-
const createPollingBot = async (
247-
fetchAbortController: AbortController,
248-
): Promise<TelegramBot | undefined> => {
249-
try {
250-
return createTelegramBot({
251-
token,
252-
runtime: opts.runtime,
253-
proxyFetch,
254-
config: cfg,
255-
accountId: account.accountId,
256-
fetchAbortSignal: fetchAbortController.signal,
257-
updateOffset: {
258-
lastUpdateId,
259-
onUpdateId: persistUpdateId,
260-
},
261-
});
262-
} catch (err) {
263-
const shouldRetry = await waitBeforeRetryOnRecoverableSetupError(
264-
err,
265-
"Telegram setup network error",
266-
);
267-
if (!shouldRetry) {
268-
return undefined;
269-
}
270-
return undefined;
271-
}
272-
};
273-
274-
const ensureWebhookCleanup = async (bot: TelegramBot): Promise<"ready" | "retry" | "exit"> => {
275-
if (webhookCleared) {
276-
return "ready";
277-
}
278-
try {
279-
await withTelegramApiErrorLogging({
280-
operation: "deleteWebhook",
281-
runtime: opts.runtime,
282-
fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }),
283-
});
284-
webhookCleared = true;
285-
return "ready";
286-
} catch (err) {
287-
const shouldRetry = await waitBeforeRetryOnRecoverableSetupError(
288-
err,
289-
"Telegram webhook cleanup failed",
290-
);
291-
return shouldRetry ? "retry" : "exit";
292-
}
293-
};
294-
295-
const confirmPersistedOffset = async (bot: TelegramBot): Promise<void> => {
296-
if (lastUpdateId === null || lastUpdateId >= Number.MAX_SAFE_INTEGER) {
297-
return;
298-
}
299-
try {
300-
await bot.api.getUpdates({ offset: lastUpdateId + 1, limit: 1, timeout: 0 });
301-
} catch {
302-
// Non-fatal: runner middleware still skips duplicates via shouldSkipUpdate.
303-
}
304-
};
305-
306-
const runPollingCycle = async (
307-
bot: TelegramBot,
308-
fetchAbortController: AbortController,
309-
): Promise<"continue" | "exit"> => {
310-
// Confirm the persisted offset with Telegram so the runner (which starts
311-
// at offset 0) does not re-fetch already-processed updates on restart.
312-
await confirmPersistedOffset(bot);
313-
314-
// Track getUpdates calls to detect polling stalls.
315-
let lastGetUpdatesAt = Date.now();
316-
bot.api.config.use((prev, method, payload, signal) => {
317-
if (method === "getUpdates") {
318-
lastGetUpdatesAt = Date.now();
319-
}
320-
return prev(method, payload, signal);
321-
});
322-
323-
const runner = run(bot, runnerOptions);
324-
activeRunner = runner;
325-
let stopPromise: Promise<void> | undefined;
326-
let stalledRestart = false;
327-
const stopRunner = () => {
328-
fetchAbortController.abort();
329-
stopPromise ??= Promise.resolve(runner.stop())
330-
.then(() => undefined)
331-
.catch(() => {
332-
// Runner may already be stopped by abort/retry paths.
333-
});
334-
return stopPromise;
335-
};
336-
const stopBot = () => {
337-
return Promise.resolve(bot.stop())
338-
.then(() => undefined)
339-
.catch(() => {
340-
// Bot may already be stopped by runner stop/abort paths.
341-
});
342-
};
343-
const stopOnAbort = () => {
344-
if (opts.abortSignal?.aborted) {
345-
void stopRunner();
346-
}
347-
};
348-
349-
// Watchdog: detect when getUpdates calls have stalled and force-restart.
350-
const watchdog = setInterval(() => {
351-
if (opts.abortSignal?.aborted) {
352-
return;
353-
}
354-
const elapsed = Date.now() - lastGetUpdatesAt;
355-
if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) {
356-
stalledRestart = true;
357-
log(
358-
`[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`,
359-
);
360-
void stopRunner();
361-
}
362-
}, POLL_WATCHDOG_INTERVAL_MS);
363-
364-
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
365-
try {
366-
// runner.task() returns a promise that resolves when the runner stops
367-
await runner.task();
368-
if (opts.abortSignal?.aborted) {
369-
return "exit";
370-
}
371-
const reason = stalledRestart
372-
? "polling stall detected"
373-
: forceRestarted
374-
? "unhandled network error"
375-
: "runner stopped (maxRetryTime exceeded or graceful stop)";
376-
forceRestarted = false;
377-
const shouldRestart = await waitBeforeRestart(
378-
(delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`,
379-
);
380-
return shouldRestart ? "continue" : "exit";
381-
} catch (err) {
382-
forceRestarted = false;
383-
if (opts.abortSignal?.aborted) {
384-
throw err;
385-
}
386-
const isConflict = isGetUpdatesConflict(err);
387-
if (isConflict) {
388-
webhookCleared = false;
389-
}
390-
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
391-
if (!isConflict && !isRecoverable) {
392-
throw err;
393-
}
394-
const reason = isConflict ? "getUpdates conflict" : "network error";
395-
const errMsg = formatErrorMessage(err);
396-
const shouldRestart = await waitBeforeRestart(
397-
(delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`,
398-
);
399-
return shouldRestart ? "continue" : "exit";
400-
} finally {
401-
clearInterval(watchdog);
402-
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
403-
await stopRunner();
404-
await stopBot();
405-
if (activeFetchAbort === fetchAbortController) {
406-
activeFetchAbort = undefined;
407-
}
408-
}
409-
};
410-
411-
while (!opts.abortSignal?.aborted) {
412-
const fetchAbortController = new AbortController();
413-
activeFetchAbort = fetchAbortController;
414-
const bot = await createPollingBot(fetchAbortController);
415-
if (!bot) {
416-
if (activeFetchAbort === fetchAbortController) {
417-
activeFetchAbort = undefined;
418-
}
419-
continue;
420-
}
421-
422-
const cleanupState = await ensureWebhookCleanup(bot);
423-
if (cleanupState === "retry") {
424-
continue;
425-
}
426-
if (cleanupState === "exit") {
427-
return;
428-
}
429-
430-
const state = await runPollingCycle(bot, fetchAbortController);
431-
if (state === "exit") {
432-
return;
433-
}
434-
}
167+
pollingSession = new TelegramPollingSession({
168+
token,
169+
config: cfg,
170+
accountId: account.accountId,
171+
runtime: opts.runtime,
172+
proxyFetch,
173+
abortSignal: opts.abortSignal,
174+
runnerOptions: createTelegramRunnerOptions(cfg),
175+
getLastUpdateId: () => lastUpdateId,
176+
persistUpdateId,
177+
log,
178+
});
179+
await pollingSession.runUntilAbort();
435180
} finally {
436181
unregisterHandler();
437182
}

0 commit comments

Comments
 (0)