Skip to content

Commit f3fe48e

Browse files
authored
Make Telegram sendMessage actions durable (#87261)
Route Telegram sendMessage action replies through durable outbound delivery so completed agent responses remain retryable when the gateway send path times out. Verified with focused Telegram/outbound tests, extension test typecheck, prepare build/check/full test gates, and green CI rerun for head 20b4568.
1 parent 5fb57b5 commit f3fe48e

9 files changed

Lines changed: 402 additions & 73 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
Docs: https://docs.openclaw.ai
44

5+
## Unreleased
6+
7+
### Fixes
8+
9+
- Telegram: route `sendMessage` action replies through durable outbound delivery so completed agent responses remain retryable when the gateway send path times out. (#87261) Thanks @mbelinky.
10+
511
## 2026.5.26
612

713
### Highlights

extensions/telegram/src/action-runtime.test.ts

Lines changed: 296 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import fs from "node:fs";
12
import os from "node:os";
23
import path from "node:path";
34
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
@@ -15,10 +16,127 @@ import {
1516

1617
const originalTelegramActionRuntime = { ...telegramActionRuntime };
1718
const reactMessageTelegram = vi.fn(async () => ({ ok: true }));
18-
const sendMessageTelegram = vi.fn(async () => ({
19-
messageId: "789",
20-
chatId: "123",
21-
}));
19+
const sendMessageTelegram = vi.fn(
20+
async (_to: string, _text: string, _opts?: Record<string, unknown>) => ({
21+
messageId: "789",
22+
chatId: "123",
23+
}),
24+
);
25+
const sendDurableMessageBatch = vi.fn(
26+
async (params: {
27+
cfg: OpenClawConfig;
28+
to: string;
29+
accountId?: string;
30+
payloads: Array<{
31+
text?: string;
32+
mediaUrl?: string;
33+
mediaUrls?: string[];
34+
audioAsVoice?: boolean;
35+
delivery?: {
36+
pin?: true | { enabled?: boolean; notify?: boolean; required?: boolean };
37+
};
38+
channelData?: { telegram?: { buttons?: unknown; quoteText?: string } };
39+
}>;
40+
replyToId?: string;
41+
threadId?: string | number;
42+
forceDocument?: boolean;
43+
silent?: boolean;
44+
gatewayClientScopes?: readonly string[];
45+
session?: {
46+
key?: string;
47+
agentId?: string;
48+
requesterAccountId?: string;
49+
};
50+
mediaAccess?: {
51+
localRoots?: readonly string[];
52+
readFile?: (filePath: string) => Promise<Buffer>;
53+
};
54+
}) => {
55+
const payload = params.payloads[0] ?? {};
56+
const mediaUrls = payload.mediaUrls?.length
57+
? payload.mediaUrls
58+
: payload.mediaUrl
59+
? [payload.mediaUrl]
60+
: [];
61+
const telegramData = payload.channelData?.telegram;
62+
const cfg = params.cfg as {
63+
channels?: {
64+
telegram?: {
65+
botToken?: string;
66+
accounts?: Record<string, { botToken?: string }>;
67+
};
68+
};
69+
};
70+
const token =
71+
(params.accountId
72+
? cfg.channels?.telegram?.accounts?.[params.accountId]?.botToken
73+
: undefined) ??
74+
cfg.channels?.telegram?.botToken ??
75+
process.env.TELEGRAM_BOT_TOKEN;
76+
const baseOptions = {
77+
cfg: params.cfg,
78+
token,
79+
accountId: params.accountId,
80+
gatewayClientScopes: params.gatewayClientScopes,
81+
replyToMessageId:
82+
params.replyToId == null ? undefined : Number.parseInt(params.replyToId, 10),
83+
messageThreadId:
84+
params.threadId == null ? undefined : Number.parseInt(String(params.threadId), 10),
85+
quoteText: telegramData?.quoteText,
86+
asVoice: payload.audioAsVoice,
87+
silent: params.silent,
88+
forceDocument: params.forceDocument,
89+
mediaLocalRoots: params.mediaAccess?.localRoots,
90+
mediaReadFile: params.mediaAccess?.readFile,
91+
};
92+
const calls = mediaUrls.length > 0 ? mediaUrls : [undefined];
93+
let last = { messageId: "789", chatId: "123" };
94+
for (const [index, mediaUrl] of calls.entries()) {
95+
last = await sendMessageTelegram(params.to, index === 0 ? (payload.text ?? "") : "", {
96+
...baseOptions,
97+
...(mediaUrl ? { mediaUrl } : {}),
98+
...(index === 0 && telegramData?.buttons ? { buttons: telegramData.buttons } : {}),
99+
});
100+
}
101+
const pin =
102+
payload.delivery?.pin === true
103+
? { enabled: true }
104+
: payload.delivery?.pin && payload.delivery.pin.enabled
105+
? payload.delivery.pin
106+
: undefined;
107+
if (pin && last.messageId) {
108+
try {
109+
await pinMessageTelegram(params.to, last.messageId, {
110+
cfg: params.cfg,
111+
accountId: params.accountId,
112+
notify: pin.notify,
113+
verbose: false,
114+
gatewayClientScopes: params.gatewayClientScopes,
115+
});
116+
} catch (err) {
117+
if (pin.required) {
118+
throw err;
119+
}
120+
}
121+
}
122+
return {
123+
status: "sent",
124+
results: [{ channel: "telegram", messageId: last.messageId, chatId: last.chatId }],
125+
receipt: {
126+
primaryPlatformMessageId: last.messageId,
127+
platformMessageIds: [last.messageId],
128+
parts: [
129+
{
130+
platformMessageId: last.messageId,
131+
kind: mediaUrls.length > 0 ? "media" : "text",
132+
index: 0,
133+
},
134+
],
135+
sentAt: Date.now(),
136+
},
137+
} as const;
138+
},
139+
);
22140
const sendPollTelegram = vi.fn(async () => ({
23141
messageId: "790",
24142
chatId: "123",
@@ -40,11 +158,13 @@ const editForumTopicTelegram = vi.fn(async () => ({
40158
messageThreadId: 42,
41159
name: "Renamed",
42160
}));
43-
const pinMessageTelegram = vi.fn(async () => ({
44-
ok: true,
45-
messageId: "789",
46-
chatId: "123",
47-
}));
161+
const pinMessageTelegram = vi.fn(
162+
async (_to: string, _messageId: string, _opts?: Record<string, unknown>) => ({
163+
ok: true,
164+
messageId: "789",
165+
chatId: "123",
166+
}),
167+
);
48168
const createForumTopicTelegram = vi.fn(async () => ({
49169
topicId: 99,
50170
name: "Topic",
@@ -109,6 +229,20 @@ function resultDetails(result: Awaited<ReturnType<typeof handleTelegramAction>>)
109229
return requireRecord(result.details, "Telegram action details");
110230
}
111231

232+
function readDurableQueueEntries(stateDir: string): Record<string, unknown>[] {
233+
const queueDir = path.join(stateDir, "delivery-queue");
234+
if (!fs.existsSync(queueDir)) {
235+
return [];
236+
}
237+
return fs
238+
.readdirSync(queueDir)
239+
.filter((name) => name.endsWith(".json"))
240+
.map((name) => JSON.parse(fs.readFileSync(path.join(queueDir, name), "utf-8"))) as Record<
241+
string,
242+
unknown
243+
>[];
244+
}
245+
112246
describe("handleTelegramAction", () => {
113247
const defaultReactionAction = {
114248
action: "react",
@@ -175,11 +309,12 @@ describe("handleTelegramAction", () => {
175309
}
176310

177311
beforeEach(() => {
178-
envSnapshot = captureEnv(["TELEGRAM_BOT_TOKEN"]);
312+
envSnapshot = captureEnv(["OPENCLAW_STATE_DIR", "TELEGRAM_BOT_TOKEN"]);
179313
resetTopicNameCacheForTest();
180314
installTopicNameStoreForTest();
181315
Object.assign(telegramActionRuntime, originalTelegramActionRuntime, {
182316
reactMessageTelegram,
317+
sendDurableMessageBatch,
183318
sendMessageTelegram,
184319
sendPollTelegram,
185320
sendStickerTelegram,
@@ -190,6 +325,7 @@ describe("handleTelegramAction", () => {
190325
createForumTopicTelegram,
191326
});
192327
reactMessageTelegram.mockClear();
328+
sendDurableMessageBatch.mockClear();
193329
sendMessageTelegram.mockClear();
194330
sendPollTelegram.mockClear();
195331
sendStickerTelegram.mockClear();
@@ -417,14 +553,26 @@ describe("handleTelegramAction", () => {
417553
content: "Hello, Telegram!",
418554
},
419555
telegramConfig(),
420-
{ gatewayClientScopes: ["operator.write"] },
556+
{
557+
gatewayClientScopes: ["operator.write"],
558+
sessionKey: "agent:main:telegram:direct:123",
559+
},
421560
);
422561
const call = mockCall(sendMessageTelegram, 0, "text message");
423562
expect(call[0]).toBe("@testchannel");
424563
expect(call[1]).toBe("Hello, Telegram!");
425564
const options = requireRecord(call[2], "text message options");
426565
expect(options.token).toBe("tok");
427566
expect(options.mediaUrl).toBeUndefined();
567+
const durableCall = mockCall(sendDurableMessageBatch, 0, "durable text message");
568+
expect(requireRecord(durableCall[0], "durable text message params")).toMatchObject({
569+
channel: "telegram",
570+
to: "@testchannel",
571+
durability: "required",
572+
gatewayClientScopes: ["operator.write"],
573+
session: { key: "agent:main:telegram:direct:123", agentId: "main" },
574+
payloads: [{ text: "Hello, Telegram!" }],
575+
});
428576
expect(result.content).toStrictEqual([
429577
{
430578
type: "text",
@@ -438,6 +586,135 @@ describe("handleTelegramAction", () => {
438586
});
439587
});
440588

589+
it("persists sendMessage action deliveries before Telegram platform send", async () => {
590+
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-action-durable-"));
591+
const { createOutboundTestPlugin, createTestRegistry, setActivePluginRegistry } =
592+
await import("openclaw/plugin-sdk/plugin-test-runtime");
593+
const sendText = vi
594+
.fn()
595+
.mockImplementationOnce(async () => {
596+
const entries = readDurableQueueEntries(stateDir);
597+
expect(entries).toHaveLength(1);
598+
expect(entries[0]).toMatchObject({
599+
channel: "telegram",
600+
to: "12345",
601+
payloads: [
602+
{
603+
text: "times out after queue write",
604+
delivery: { pin: { enabled: true, required: true } },
605+
},
606+
],
607+
session: { key: "agent:main:telegram:direct:12345", agentId: "main" },
608+
gatewayClientScopes: ["operator.write"],
609+
retryCount: 0,
610+
});
611+
throw new Error("telegram timeout");
612+
})
613+
.mockImplementationOnce(async () => {
614+
const entries = readDurableQueueEntries(stateDir);
615+
const liveEntry = entries.find((entry) =>
616+
JSON.stringify(entry.payloads).includes("delivers after queue write"),
617+
);
618+
expect(liveEntry).toMatchObject({
619+
channel: "telegram",
620+
to: "12345",
621+
payloads: [{ text: "delivers after queue write" }],
622+
retryCount: 0,
623+
});
624+
return { channel: "telegram", messageId: "tg-ok" };
625+
});
626+
627+
process.env.OPENCLAW_STATE_DIR = stateDir;
628+
telegramActionRuntime.sendDurableMessageBatch =
629+
originalTelegramActionRuntime.sendDurableMessageBatch;
630+
setActivePluginRegistry(
631+
createTestRegistry([
632+
{
633+
pluginId: "telegram",
634+
source: "test",
635+
plugin: createOutboundTestPlugin({
636+
id: "telegram",
637+
outbound: {
638+
deliveryMode: "direct",
639+
deliveryCapabilities: {
640+
durableFinal: {
641+
text: true,
642+
media: true,
643+
payload: true,
644+
silent: true,
645+
replyTo: true,
646+
thread: true,
647+
messageSendingHooks: true,
648+
batch: true,
649+
},
650+
},
651+
sendText,
652+
},
653+
}),
654+
},
655+
]),
656+
);
657+
658+
try {
659+
await expect(
660+
handleTelegramAction(
661+
{
662+
action: "sendMessage",
663+
to: "12345",
664+
content: "times out after queue write",
665+
delivery: { pin: { enabled: true, required: true } },
666+
},
667+
telegramConfig(),
668+
{
669+
gatewayClientScopes: ["operator.write"],
670+
sessionKey: "agent:main:telegram:direct:12345",
671+
},
672+
),
673+
).rejects.toThrow("telegram timeout");
674+
675+
const retryableEntries = readDurableQueueEntries(stateDir);
676+
expect(retryableEntries).toHaveLength(1);
677+
expect(retryableEntries[0]).toMatchObject({
678+
payloads: [
679+
{
680+
text: "times out after queue write",
681+
delivery: { pin: { enabled: true, required: true } },
682+
},
683+
],
684+
retryCount: 1,
685+
});
686+
expect(String(retryableEntries[0]?.lastError)).toContain("telegram timeout");
687+
688+
const result = await handleTelegramAction(
689+
{
690+
action: "sendMessage",
691+
to: "12345",
692+
content: "delivers after queue write",
693+
},
694+
telegramConfig(),
695+
{ sessionKey: "agent:main:telegram:direct:12345" },
696+
);
697+
698+
expect(result.details).toMatchObject({
699+
ok: true,
700+
messageId: "tg-ok",
701+
});
702+
expect(readDurableQueueEntries(stateDir)).toHaveLength(1);
703+
expect(readDurableQueueEntries(stateDir)[0]).toMatchObject({
704+
payloads: [
705+
{
706+
text: "times out after queue write",
707+
delivery: { pin: { enabled: true, required: true } },
708+
},
709+
],
710+
retryCount: 1,
711+
});
712+
} finally {
713+
setActivePluginRegistry(createTestRegistry([]));
714+
fs.rmSync(stateDir, { recursive: true, force: true });
715+
}
716+
});
717+
441718
it("normalizes legacy group targets for sendMessage actions", async () => {
442719
await handleTelegramAction(
443720
{
@@ -1092,6 +1369,10 @@ describe("handleTelegramAction", () => {
10921369
expect(options.accountId).toBeUndefined();
10931370
expect(options.verbose).toBe(false);
10941371
expect(options.gatewayClientScopes).toEqual(["operator.write"]);
1372+
const durableCall = mockCall(sendDurableMessageBatch, 0, "durable delivery pin");
1373+
expect(requireRecord(durableCall[0], "durable delivery pin params")).toMatchObject({
1374+
payloads: [{ delivery: { pin: { enabled: true } } }],
1375+
});
10951376
});
10961377

10971378
it("passes delivery pin notify requests for action sends", async () => {
@@ -1109,6 +1390,10 @@ describe("handleTelegramAction", () => {
11091390
expect(call[0]).toBe("123456");
11101391
expect(call[1]).toBe("789");
11111392
expect(requireRecord(call[2], "delivery pin notify options").notify).toBe(true);
1393+
const durableCall = mockCall(sendDurableMessageBatch, 0, "durable delivery pin notify");
1394+
expect(requireRecord(durableCall[0], "durable delivery pin notify params")).toMatchObject({
1395+
payloads: [{ delivery: { pin: { enabled: true, notify: true } } }],
1396+
});
11121397
});
11131398

11141399
it("fails required action-send pins when pinning fails", async () => {

0 commit comments

Comments
 (0)