1- import { type RunOptions , run } from "@grammyjs/runner" ;
1+ import type { RunOptions } from "@grammyjs/runner" ;
22import { resolveAgentMaxConcurrent } from "../config/agent-limits.js" ;
33import type { OpenClawConfig } from "../config/config.js" ;
44import { loadConfig } from "../config/config.js" ;
55import { waitForAbortSignal } from "../infra/abort-signal.js" ;
6- import { computeBackoff , sleepWithAbort } from "../infra/backoff.js" ;
76import { formatErrorMessage } from "../infra/errors.js" ;
8- import { formatDurationPrecise } from "../infra/format-time/format-duration.ts" ;
97import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js" ;
108import type { RuntimeEnv } from "../runtime.js" ;
119import { resolveTelegramAccount } from "./accounts.js" ;
1210import { resolveTelegramAllowedUpdates } from "./allowed-updates.js" ;
13- import { withTelegramApiErrorLogging } from "./api-logging.js" ;
14- import { createTelegramBot } from "./bot.js" ;
1511import { isRecoverableTelegramNetworkError } from "./network-errors.js" ;
12+ import { TelegramPollingSession } from "./polling-session.js" ;
1613import { makeProxyFetch } from "./proxy.js" ;
1714import { readTelegramUpdateOffset , writeTelegramUpdateOffset } from "./update-offset-store.js" ;
1815import { startTelegramWebhook } from "./webhook.js" ;
@@ -55,21 +52,6 @@ export function createTelegramRunnerOptions(cfg: OpenClawConfig): RunOptions<unk
5552 } ;
5653}
5754
58- const TELEGRAM_POLL_RESTART_POLICY = {
59- initialMs : 2000 ,
60- maxMs : 30_000 ,
61- factor : 1.8 ,
62- jitter : 0.25 ,
63- } ;
64-
65- // Polling stall detection: if no getUpdates call is seen for this long,
66- // assume the runner is stuck and force-restart it.
67- // Default fetch timeout is 30s, so 3x gives ample margin for slow responses.
68- const POLL_STALL_THRESHOLD_MS = 90_000 ;
69- const POLL_WATCHDOG_INTERVAL_MS = 30_000 ;
70-
71- type TelegramBot = ReturnType < typeof createTelegramBot > ;
72-
7355function normalizePersistedUpdateId ( value : number | null ) : number | null {
7456 if ( value === null ) {
7557 return null ;
@@ -80,28 +62,6 @@ function normalizePersistedUpdateId(value: number | null): number | null {
8062 return value ;
8163}
8264
83- const isGetUpdatesConflict = ( err : unknown ) => {
84- if ( ! err || typeof err !== "object" ) {
85- return false ;
86- }
87- const typed = err as {
88- error_code ?: number ;
89- errorCode ?: number ;
90- description ?: string ;
91- method ?: string ;
92- message ?: string ;
93- } ;
94- const errorCode = typed . error_code ?? typed . errorCode ;
95- if ( errorCode !== 409 ) {
96- return false ;
97- }
98- const haystack = [ typed . method , typed . description , typed . message ]
99- . filter ( ( value ) : value is string => typeof value === "string" )
100- . join ( " " )
101- . toLowerCase ( ) ;
102- return haystack . includes ( "getupdates" ) ;
103- } ;
104-
10565/** Check if error is a Grammy HttpError (used to scope unhandled rejection handling) */
10666const isGrammyHttpError = ( err : unknown ) : boolean => {
10767 if ( ! err || typeof err !== "object" ) {
@@ -112,31 +72,26 @@ const isGrammyHttpError = (err: unknown): boolean => {
11272
11373export async function monitorTelegramProvider ( opts : MonitorTelegramOpts = { } ) {
11474 const log = opts . runtime ?. error ?? console . error ;
115- let activeRunner : ReturnType < typeof run > | undefined ;
116- let activeFetchAbort : AbortController | undefined ;
117- let forceRestarted = false ;
75+ let pollingSession : TelegramPollingSession | undefined ;
11876
119- // Register handler for Grammy HttpError unhandled rejections.
120- // This catches network errors that escape the polling loop's try-catch
121- // (e.g., from setMyCommands during bot setup).
122- // We gate on isGrammyHttpError to avoid suppressing non-Telegram errors.
12377 const unregisterHandler = registerUnhandledRejectionHandler ( ( err ) => {
12478 const isNetworkError = isRecoverableTelegramNetworkError ( err , { context : "polling" } ) ;
12579 if ( isGrammyHttpError ( err ) && isNetworkError ) {
12680 log ( `[telegram] Suppressed network error: ${ formatErrorMessage ( err ) } ` ) ;
127- return true ; // handled - don't crash
81+ return true ;
12882 }
129- // Network failures can surface outside the runner task promise and leave
130- // polling stuck; force-stop the active runner so the loop can recover.
83+
84+ const activeRunner = pollingSession ?. activeRunner ;
13185 if ( isNetworkError && activeRunner && activeRunner . isRunning ( ) ) {
132- forceRestarted = true ;
133- activeFetchAbort ?. abort ( ) ;
86+ pollingSession ?. markForceRestarted ( ) ;
87+ pollingSession ?. abortActiveFetch ( ) ;
13488 void activeRunner . stop ( ) . catch ( ( ) => { } ) ;
13589 log (
13690 `[telegram] Restarting polling after unhandled network error: ${ formatErrorMessage ( err ) } ` ,
13791 ) ;
138- return true ; // handled
92+ return true ;
13993 }
94+
14095 return false ;
14196 } ) ;
14297
@@ -166,6 +121,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
166121 `[telegram] Ignoring invalid persisted update offset (${ String ( persistedOffsetRaw ) } ); starting without offset confirmation.` ,
167122 ) ;
168123 }
124+
169125 const persistUpdateId = async ( updateId : number ) => {
170126 const normalizedUpdateId = normalizePersistedUpdateId ( updateId ) ;
171127 if ( normalizedUpdateId === null ) {
@@ -208,230 +164,19 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
208164 return ;
209165 }
210166
211- // Use grammyjs/runner for concurrent update processing
212- let restartAttempts = 0 ;
213- let webhookCleared = false ;
214- const runnerOptions = createTelegramRunnerOptions ( cfg ) ;
215- const waitBeforeRestart = async ( buildLine : ( delay : string ) => string ) : Promise < boolean > => {
216- restartAttempts += 1 ;
217- const delayMs = computeBackoff ( TELEGRAM_POLL_RESTART_POLICY , restartAttempts ) ;
218- const delay = formatDurationPrecise ( delayMs ) ;
219- log ( buildLine ( delay ) ) ;
220- try {
221- await sleepWithAbort ( delayMs , opts . abortSignal ) ;
222- } catch ( sleepErr ) {
223- if ( opts . abortSignal ?. aborted ) {
224- return false ;
225- }
226- throw sleepErr ;
227- }
228- return true ;
229- } ;
230-
231- const waitBeforeRetryOnRecoverableSetupError = async (
232- err : unknown ,
233- logPrefix : string ,
234- ) : Promise < boolean > => {
235- if ( opts . abortSignal ?. aborted ) {
236- return false ;
237- }
238- if ( ! isRecoverableTelegramNetworkError ( err , { context : "unknown" } ) ) {
239- throw err ;
240- }
241- return waitBeforeRestart (
242- ( delay ) => `${ logPrefix } : ${ formatErrorMessage ( err ) } ; retrying in ${ delay } .` ,
243- ) ;
244- } ;
245-
246- const createPollingBot = async (
247- fetchAbortController : AbortController ,
248- ) : Promise < TelegramBot | undefined > => {
249- try {
250- return createTelegramBot ( {
251- token,
252- runtime : opts . runtime ,
253- proxyFetch,
254- config : cfg ,
255- accountId : account . accountId ,
256- fetchAbortSignal : fetchAbortController . signal ,
257- updateOffset : {
258- lastUpdateId,
259- onUpdateId : persistUpdateId ,
260- } ,
261- } ) ;
262- } catch ( err ) {
263- const shouldRetry = await waitBeforeRetryOnRecoverableSetupError (
264- err ,
265- "Telegram setup network error" ,
266- ) ;
267- if ( ! shouldRetry ) {
268- return undefined ;
269- }
270- return undefined ;
271- }
272- } ;
273-
274- const ensureWebhookCleanup = async ( bot : TelegramBot ) : Promise < "ready" | "retry" | "exit" > => {
275- if ( webhookCleared ) {
276- return "ready" ;
277- }
278- try {
279- await withTelegramApiErrorLogging ( {
280- operation : "deleteWebhook" ,
281- runtime : opts . runtime ,
282- fn : ( ) => bot . api . deleteWebhook ( { drop_pending_updates : false } ) ,
283- } ) ;
284- webhookCleared = true ;
285- return "ready" ;
286- } catch ( err ) {
287- const shouldRetry = await waitBeforeRetryOnRecoverableSetupError (
288- err ,
289- "Telegram webhook cleanup failed" ,
290- ) ;
291- return shouldRetry ? "retry" : "exit" ;
292- }
293- } ;
294-
295- const confirmPersistedOffset = async ( bot : TelegramBot ) : Promise < void > => {
296- if ( lastUpdateId === null || lastUpdateId >= Number . MAX_SAFE_INTEGER ) {
297- return ;
298- }
299- try {
300- await bot . api . getUpdates ( { offset : lastUpdateId + 1 , limit : 1 , timeout : 0 } ) ;
301- } catch {
302- // Non-fatal: runner middleware still skips duplicates via shouldSkipUpdate.
303- }
304- } ;
305-
306- const runPollingCycle = async (
307- bot : TelegramBot ,
308- fetchAbortController : AbortController ,
309- ) : Promise < "continue" | "exit" > => {
310- // Confirm the persisted offset with Telegram so the runner (which starts
311- // at offset 0) does not re-fetch already-processed updates on restart.
312- await confirmPersistedOffset ( bot ) ;
313-
314- // Track getUpdates calls to detect polling stalls.
315- let lastGetUpdatesAt = Date . now ( ) ;
316- bot . api . config . use ( ( prev , method , payload , signal ) => {
317- if ( method === "getUpdates" ) {
318- lastGetUpdatesAt = Date . now ( ) ;
319- }
320- return prev ( method , payload , signal ) ;
321- } ) ;
322-
323- const runner = run ( bot , runnerOptions ) ;
324- activeRunner = runner ;
325- let stopPromise : Promise < void > | undefined ;
326- let stalledRestart = false ;
327- const stopRunner = ( ) => {
328- fetchAbortController . abort ( ) ;
329- stopPromise ??= Promise . resolve ( runner . stop ( ) )
330- . then ( ( ) => undefined )
331- . catch ( ( ) => {
332- // Runner may already be stopped by abort/retry paths.
333- } ) ;
334- return stopPromise ;
335- } ;
336- const stopBot = ( ) => {
337- return Promise . resolve ( bot . stop ( ) )
338- . then ( ( ) => undefined )
339- . catch ( ( ) => {
340- // Bot may already be stopped by runner stop/abort paths.
341- } ) ;
342- } ;
343- const stopOnAbort = ( ) => {
344- if ( opts . abortSignal ?. aborted ) {
345- void stopRunner ( ) ;
346- }
347- } ;
348-
349- // Watchdog: detect when getUpdates calls have stalled and force-restart.
350- const watchdog = setInterval ( ( ) => {
351- if ( opts . abortSignal ?. aborted ) {
352- return ;
353- }
354- const elapsed = Date . now ( ) - lastGetUpdatesAt ;
355- if ( elapsed > POLL_STALL_THRESHOLD_MS && runner . isRunning ( ) ) {
356- stalledRestart = true ;
357- log (
358- `[telegram] Polling stall detected (no getUpdates for ${ formatDurationPrecise ( elapsed ) } ); forcing restart.` ,
359- ) ;
360- void stopRunner ( ) ;
361- }
362- } , POLL_WATCHDOG_INTERVAL_MS ) ;
363-
364- opts . abortSignal ?. addEventListener ( "abort" , stopOnAbort , { once : true } ) ;
365- try {
366- // runner.task() returns a promise that resolves when the runner stops
367- await runner . task ( ) ;
368- if ( opts . abortSignal ?. aborted ) {
369- return "exit" ;
370- }
371- const reason = stalledRestart
372- ? "polling stall detected"
373- : forceRestarted
374- ? "unhandled network error"
375- : "runner stopped (maxRetryTime exceeded or graceful stop)" ;
376- forceRestarted = false ;
377- const shouldRestart = await waitBeforeRestart (
378- ( delay ) => `Telegram polling runner stopped (${ reason } ); restarting in ${ delay } .` ,
379- ) ;
380- return shouldRestart ? "continue" : "exit" ;
381- } catch ( err ) {
382- forceRestarted = false ;
383- if ( opts . abortSignal ?. aborted ) {
384- throw err ;
385- }
386- const isConflict = isGetUpdatesConflict ( err ) ;
387- if ( isConflict ) {
388- webhookCleared = false ;
389- }
390- const isRecoverable = isRecoverableTelegramNetworkError ( err , { context : "polling" } ) ;
391- if ( ! isConflict && ! isRecoverable ) {
392- throw err ;
393- }
394- const reason = isConflict ? "getUpdates conflict" : "network error" ;
395- const errMsg = formatErrorMessage ( err ) ;
396- const shouldRestart = await waitBeforeRestart (
397- ( delay ) => `Telegram ${ reason } : ${ errMsg } ; retrying in ${ delay } .` ,
398- ) ;
399- return shouldRestart ? "continue" : "exit" ;
400- } finally {
401- clearInterval ( watchdog ) ;
402- opts . abortSignal ?. removeEventListener ( "abort" , stopOnAbort ) ;
403- await stopRunner ( ) ;
404- await stopBot ( ) ;
405- if ( activeFetchAbort === fetchAbortController ) {
406- activeFetchAbort = undefined ;
407- }
408- }
409- } ;
410-
411- while ( ! opts . abortSignal ?. aborted ) {
412- const fetchAbortController = new AbortController ( ) ;
413- activeFetchAbort = fetchAbortController ;
414- const bot = await createPollingBot ( fetchAbortController ) ;
415- if ( ! bot ) {
416- if ( activeFetchAbort === fetchAbortController ) {
417- activeFetchAbort = undefined ;
418- }
419- continue ;
420- }
421-
422- const cleanupState = await ensureWebhookCleanup ( bot ) ;
423- if ( cleanupState === "retry" ) {
424- continue ;
425- }
426- if ( cleanupState === "exit" ) {
427- return ;
428- }
429-
430- const state = await runPollingCycle ( bot , fetchAbortController ) ;
431- if ( state === "exit" ) {
432- return ;
433- }
434- }
167+ pollingSession = new TelegramPollingSession ( {
168+ token,
169+ config : cfg ,
170+ accountId : account . accountId ,
171+ runtime : opts . runtime ,
172+ proxyFetch,
173+ abortSignal : opts . abortSignal ,
174+ runnerOptions : createTelegramRunnerOptions ( cfg ) ,
175+ getLastUpdateId : ( ) => lastUpdateId ,
176+ persistUpdateId,
177+ log,
178+ } ) ;
179+ await pollingSession . runUntilAbort ( ) ;
435180 } finally {
436181 unregisterHandler ( ) ;
437182 }
0 commit comments