Skip to content

Commit 1bf2801

Browse files
committed
fix(telegram): recover sticky fallback transport
1 parent 737e570 commit 1bf2801

3 files changed

Lines changed: 133 additions & 16 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Docs: https://docs.openclaw.ai
1010
- Channels/streaming: make progress draft labels scroll away with other progress lines, render structured tool rows as compact emoji/title/details, show web-search queries from provider-native argument shapes, and skip empty Discord apply-patch starts until a patch summary exists. (#79146)
1111
- Telegram: preserve the channel-specific 10-option poll cap in the unified outbound adapter so over-limit polls are rejected before send. (#78762) Thanks @obviyus.
1212
- Slack: route handled top-level channel turns in implicit-conversation channels to thread-scoped sessions when Slack reply threading is enabled, keeping the root turn and later thread replies on one OpenClaw session. (#78522) Thanks @zeroth-blip.
13+
- Telegram: re-probe the primary fetch transport after repeated sticky fallback success so transient IPv4 or pinned-IP fallback promotion can recover without a gateway restart. Fixes #77088. (#77157) Thanks @MkDev11.
1314
- Runtime/install: raise the supported Node 22 floor to `22.16+` so native SQLite query handling can rely on the `node:sqlite` statement metadata API while continuing to recommend Node 24. (#78921)
1415
- Discord/voice: include a bounded one-line STT transcript preview in verbose voice logs so live voice debugging shows what speakers said before the agent reply.
1516
- Codex app-server: pin the managed Codex harness and Codex CLI smoke package to `@openai/codex@0.129.0`, defer OpenClaw integration dynamic tools behind Codex tool search by default, and accept current Codex service-tier values so legacy `fast` settings survive the stable harness upgrade as `priority`.

extensions/telegram/src/fetch.test.ts

Lines changed: 84 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -787,8 +787,11 @@ describe("resolveTelegramFetch", () => {
787787
);
788788
});
789789

790-
it("retries once and then keeps sticky IPv4 dispatcher for subsequent requests", async () => {
791-
primeStickyFallbackRetry("ETIMEDOUT");
790+
it("retries once, keeps sticky IPv4, then recovers to primary dispatcher", async () => {
791+
undiciFetch.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"));
792+
for (let i = 0; i < 7; i += 1) {
793+
undiciFetch.mockResolvedValueOnce({ ok: true } as Response);
794+
}
792795

793796
const resolved = resolveTelegramFetchOrThrow(undefined, {
794797
network: {
@@ -797,20 +800,30 @@ describe("resolveTelegramFetch", () => {
797800
});
798801

799802
await resolved("https://api.telegram.org/botx/sendMessage");
800-
await resolved("https://api.telegram.org/botx/sendChatAction");
803+
for (let i = 0; i < 4; i += 1) {
804+
await resolved(`https://api.telegram.org/botx/sendChatAction?sticky=${i}`);
805+
}
806+
await resolved("https://api.telegram.org/botx/getMe");
807+
await resolved("https://api.telegram.org/botx/deleteWebhook");
801808

802-
expect(undiciFetch).toHaveBeenCalledTimes(3);
809+
expect(undiciFetch).toHaveBeenCalledTimes(8);
803810

804811
const firstDispatcher = getDispatcherFromUndiciCall(1);
805812
const secondDispatcher = getDispatcherFromUndiciCall(2);
806-
const thirdDispatcher = getDispatcherFromUndiciCall(3);
813+
const sixthDispatcher = getDispatcherFromUndiciCall(6);
814+
const seventhDispatcher = getDispatcherFromUndiciCall(7);
815+
const eighthDispatcher = getDispatcherFromUndiciCall(8);
807816

808817
expect(firstDispatcher).toBeDefined();
809818
expect(secondDispatcher).toBeDefined();
810-
expect(thirdDispatcher).toBeDefined();
819+
expect(sixthDispatcher).toBeDefined();
820+
expect(seventhDispatcher).toBeDefined();
821+
expect(eighthDispatcher).toBeDefined();
811822

812823
expect(firstDispatcher).not.toBe(secondDispatcher);
813-
expect(secondDispatcher).toBe(thirdDispatcher);
824+
expect(secondDispatcher).toBe(sixthDispatcher);
825+
expect(seventhDispatcher).toBe(firstDispatcher);
826+
expect(eighthDispatcher).toBe(firstDispatcher);
814827

815828
expectStickyAutoSelectDispatcher(firstDispatcher);
816829
expect(secondDispatcher?.options?.connect).toEqual(
@@ -822,17 +835,21 @@ describe("resolveTelegramFetch", () => {
822835
expect(loggerDebug).toHaveBeenCalledWith(
823836
expect.stringContaining("fetch fallback: enabling sticky IPv4-only dispatcher"),
824837
);
838+
expect(loggerDebug).toHaveBeenCalledWith(
839+
expect.stringContaining("fetch fallback: recovered from attempt 1 to attempt 0"),
840+
);
825841
expect(loggerWarn).not.toHaveBeenCalledWith(
826842
expect.stringContaining("fetch fallback: enabling sticky IPv4-only dispatcher"),
827843
);
828844
});
829845

830-
it("escalates from IPv4 fallback to pinned Telegram IP and keeps it sticky", async () => {
846+
it("escalates from IPv4 fallback to pinned Telegram IP and recovers to primary", async () => {
831847
undiciFetch
832848
.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"))
833-
.mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH"))
834-
.mockResolvedValueOnce({ ok: true } as Response)
835-
.mockResolvedValueOnce({ ok: true } as Response);
849+
.mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH"));
850+
for (let i = 0; i < 7; i += 1) {
851+
undiciFetch.mockResolvedValueOnce({ ok: true } as Response);
852+
}
836853

837854
const resolved = resolveTelegramFetchOrThrow(undefined, {
838855
network: {
@@ -842,20 +859,72 @@ describe("resolveTelegramFetch", () => {
842859
});
843860

844861
await resolved("https://api.telegram.org/botx/sendMessage");
845-
await resolved("https://api.telegram.org/botx/sendChatAction");
862+
for (let i = 0; i < 4; i += 1) {
863+
await resolved(`https://api.telegram.org/botx/sendChatAction?sticky=${i}`);
864+
}
865+
await resolved("https://api.telegram.org/botx/getMe");
866+
await resolved("https://api.telegram.org/botx/deleteWebhook");
846867

847-
expect(undiciFetch).toHaveBeenCalledTimes(4);
868+
expect(undiciFetch).toHaveBeenCalledTimes(9);
848869

870+
const firstDispatcher = getDispatcherFromUndiciCall(1);
849871
const secondDispatcher = getDispatcherFromUndiciCall(2);
850872
const thirdDispatcher = getDispatcherFromUndiciCall(3);
851-
const fourthDispatcher = getDispatcherFromUndiciCall(4);
873+
const seventhDispatcher = getDispatcherFromUndiciCall(7);
874+
const eighthDispatcher = getDispatcherFromUndiciCall(8);
875+
const ninthDispatcher = getDispatcherFromUndiciCall(9);
852876

853877
expect(secondDispatcher).not.toBe(thirdDispatcher);
854-
expect(thirdDispatcher).toBe(fourthDispatcher);
878+
expect(thirdDispatcher).toBe(seventhDispatcher);
879+
expect(eighthDispatcher).toBe(firstDispatcher);
880+
expect(ninthDispatcher).toBe(firstDispatcher);
855881
expectPinnedFallbackIpDispatcher(3);
856882
expect(loggerWarn).toHaveBeenCalledWith(
857883
expect.stringContaining("fetch fallback: DNS-resolved IP unreachable"),
858884
);
885+
expect(loggerDebug).toHaveBeenCalledWith(
886+
expect.stringContaining("fetch fallback: recovered from attempt 2 to attempt 0"),
887+
);
888+
});
889+
890+
it("keeps sticky fallback after a failed primary recovery probe", async () => {
891+
undiciFetch
892+
.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"))
893+
.mockResolvedValueOnce({ ok: true } as Response)
894+
.mockResolvedValueOnce({ ok: true } as Response)
895+
.mockResolvedValueOnce({ ok: true } as Response)
896+
.mockResolvedValueOnce({ ok: true } as Response)
897+
.mockResolvedValueOnce({ ok: true } as Response)
898+
.mockRejectedValueOnce(buildFetchFallbackError("ETIMEDOUT"))
899+
.mockResolvedValueOnce({ ok: true } as Response)
900+
.mockResolvedValueOnce({ ok: true } as Response);
901+
902+
const resolved = resolveTelegramFetchOrThrow(undefined, {
903+
network: {
904+
autoSelectFamily: true,
905+
},
906+
});
907+
908+
await resolved("https://api.telegram.org/botx/sendMessage");
909+
for (let i = 0; i < 4; i += 1) {
910+
await resolved(`https://api.telegram.org/botx/sendChatAction?sticky=${i}`);
911+
}
912+
await resolved("https://api.telegram.org/botx/getMe");
913+
await resolved("https://api.telegram.org/botx/deleteWebhook");
914+
915+
expect(undiciFetch).toHaveBeenCalledTimes(9);
916+
917+
const firstDispatcher = getDispatcherFromUndiciCall(1);
918+
const secondDispatcher = getDispatcherFromUndiciCall(2);
919+
920+
expect(firstDispatcher).not.toBe(secondDispatcher);
921+
expect(getDispatcherFromUndiciCall(6)).toBe(secondDispatcher);
922+
expect(getDispatcherFromUndiciCall(7)).toBe(firstDispatcher);
923+
expect(getDispatcherFromUndiciCall(8)).toBe(secondDispatcher);
924+
expect(getDispatcherFromUndiciCall(9)).toBe(secondDispatcher);
925+
expect(loggerDebug).toHaveBeenCalledWith(
926+
expect.stringContaining("fetch fallback: re-probing primary dispatcher"),
927+
);
859928
});
860929

861930
it("keeps the armed fallback sticky when all attempts fail", async () => {

extensions/telegram/src/fetch.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const TELEGRAM_DISPATCHER_KEEP_ALIVE_TIMEOUT_MS = 30_000;
4141
const TELEGRAM_DISPATCHER_KEEP_ALIVE_MAX_TIMEOUT_MS = 600_000;
4242
const TELEGRAM_DISPATCHER_CONNECTIONS_PER_ORIGIN = 10;
4343
const TELEGRAM_DISPATCHER_PIPELINING = 1;
44+
const TELEGRAM_STICKY_FALLBACK_PRIMARY_PROBE_SUCCESS_THRESHOLD = 5;
4445

4546
type TelegramAgentPoolOptions = {
4647
allowH2: false;
@@ -640,6 +641,14 @@ export function resolveTelegramTransport(
640641
});
641642

642643
let stickyAttemptIndex = 0;
644+
let stickySuccessCount = 0;
645+
let primaryProbeDue = false;
646+
647+
const resetStickyRecoveryProbe = (): void => {
648+
stickySuccessCount = 0;
649+
primaryProbeDue = false;
650+
};
651+
643652
const promoteStickyAttempt = (nextIndex: number, err: unknown, reason?: string): boolean => {
644653
if (nextIndex <= stickyAttemptIndex || nextIndex >= transportAttempts.length) {
645654
return false;
@@ -655,14 +664,48 @@ export function resolveTelegramTransport(
655664
}
656665
}
657666
stickyAttemptIndex = nextIndex;
667+
resetStickyRecoveryProbe();
658668
return true;
659669
};
660670

671+
const recordSuccessfulAttempt = (attemptIndex: number): void => {
672+
if (stickyAttemptIndex === 0) {
673+
resetStickyRecoveryProbe();
674+
return;
675+
}
676+
677+
if (attemptIndex < stickyAttemptIndex) {
678+
log.debug(
679+
`fetch fallback: recovered from attempt ${stickyAttemptIndex} to attempt ${attemptIndex}`,
680+
);
681+
stickyAttemptIndex = attemptIndex;
682+
resetStickyRecoveryProbe();
683+
return;
684+
}
685+
686+
if (attemptIndex !== stickyAttemptIndex) {
687+
return;
688+
}
689+
690+
stickySuccessCount += 1;
691+
if (stickySuccessCount >= TELEGRAM_STICKY_FALLBACK_PRIMARY_PROBE_SUCCESS_THRESHOLD) {
692+
stickySuccessCount = 0;
693+
primaryProbeDue = true;
694+
log.debug("fetch fallback: scheduling primary dispatcher recovery probe");
695+
}
696+
};
697+
661698
const resolvedFetch = (async (input: RequestInfo | URL, init?: RequestInit) => {
662699
const callerProvidedDispatcher = Boolean(
663700
(init as RequestInitWithDispatcher | undefined)?.dispatcher,
664701
);
665-
const startIndex = Math.min(stickyAttemptIndex, transportAttempts.length - 1);
702+
const stickyStartIndex = Math.min(stickyAttemptIndex, transportAttempts.length - 1);
703+
const primaryProbe = !callerProvidedDispatcher && primaryProbeDue && stickyStartIndex > 0;
704+
const startIndex = primaryProbe ? 0 : stickyStartIndex;
705+
if (primaryProbe) {
706+
primaryProbeDue = false;
707+
log.debug("fetch fallback: re-probing primary dispatcher after sticky fallback successes");
708+
}
666709
let err: unknown;
667710

668711
try {
@@ -679,6 +722,9 @@ export function resolveTelegramTransport(
679722
flowId: randomUUID(),
680723
meta: { subsystem: "telegram-fetch" },
681724
});
725+
if (!callerProvidedDispatcher) {
726+
recordSuccessfulAttempt(startIndex);
727+
}
682728
return response;
683729
} catch (caught) {
684730
err = caught;
@@ -708,6 +754,7 @@ export function resolveTelegramTransport(
708754
flowId: randomUUID(),
709755
meta: { subsystem: "telegram-fetch", fallbackAttempt: nextIndex },
710756
});
757+
recordSuccessfulAttempt(nextIndex);
711758
return response;
712759
} catch (caught) {
713760
err = caught;

0 commit comments

Comments
 (0)