Skip to content

Commit 2a994fe

Browse files
authored
Merge f515e5d into 32d9caf
2 parents 32d9caf + f515e5d commit 2a994fe

8 files changed

Lines changed: 394 additions & 81 deletions

File tree

extensions/telegram/src/bot/helpers.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,16 @@ export type TelegramThreadSpec = {
8888
scope: "dm" | "forum" | "none";
8989
};
9090

91+
export function shouldAllowTelegramThreadlessFallback(
92+
thread?: TelegramThreadSpec | null,
93+
options?: { allowDmThreadFallback?: boolean },
94+
): boolean {
95+
if (thread?.scope === "dm") {
96+
return options?.allowDmThreadFallback === true;
97+
}
98+
return thread?.id == null || Math.trunc(thread.id) === TELEGRAM_GENERAL_TOPIC_ID;
99+
}
100+
91101
export function shouldUseTelegramDmThreadSession(params: {
92102
dmThreadId?: number;
93103
botHasTopicsEnabled?: boolean;

extensions/telegram/src/draft-stream.test.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,28 @@ describe("createTelegramDraftStream", () => {
211211
expect(api.sendMessage).toHaveBeenCalledTimes(1);
212212
});
213213

214+
it("does not retry forum materialize sends without thread when thread is not found", async () => {
215+
const api = createMockDraftApi();
216+
api.sendMessage.mockRejectedValueOnce(new Error("400: Bad Request: message thread not found"));
217+
const warn = vi.fn();
218+
const stream = createDraftStream(api, {
219+
thread: { id: 42, scope: "forum" },
220+
warn,
221+
});
222+
223+
stream.update("Hello");
224+
await stream.flush();
225+
const materializedId = await stream.materialize?.();
226+
227+
expect(materializedId).toBeUndefined();
228+
expect(api.sendMessage).toHaveBeenCalledTimes(1);
229+
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 42 });
230+
expect(warn).not.toHaveBeenCalledWith(
231+
"telegram stream preview materialize send failed with message_thread_id, retrying without thread",
232+
);
233+
expect(warn).toHaveBeenCalledWith(expect.stringContaining("telegram stream preview failed:"));
234+
});
235+
214236
it("returns existing preview id when materializing message transport", async () => {
215237
const api = createMockDraftApi();
216238
const stream = createDraftStream(api, {

extensions/telegram/src/draft-stream.ts

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,29 @@ import {
44
takeMessageIdAfterStop,
55
} from "openclaw/plugin-sdk/channel-outbound";
66
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
7-
import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js";
7+
import {
8+
buildTelegramThreadParams,
9+
shouldAllowTelegramThreadlessFallback,
10+
type TelegramThreadSpec,
11+
} from "./bot/helpers.js";
812
import { isSafeToRetrySendError, isTelegramClientRejection } from "./network-errors.js";
913
import { normalizeTelegramReplyToMessageId } from "./outbound-params.js";
1014

1115
const TELEGRAM_STREAM_MAX_CHARS = 4096;
1216
const DEFAULT_THROTTLE_MS = 1000;
17+
const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i;
18+
19+
type TelegramSendMessageParams = Parameters<Bot["api"]["sendMessage"]>[2];
20+
21+
function hasNumericMessageThreadId(
22+
params: TelegramSendMessageParams | undefined,
23+
): params is TelegramSendMessageParams & { message_thread_id: number } {
24+
return (
25+
typeof params === "object" &&
26+
params !== null &&
27+
typeof (params as { message_thread_id?: unknown }).message_thread_id === "number"
28+
);
29+
}
1330

1431
export type TelegramDraftStream = {
1532
update: (text: string) => void;
@@ -122,17 +139,42 @@ export function createTelegramDraftStream(params: {
122139
renderedParseMode: "HTML" | undefined;
123140
sendGeneration: number;
124141
};
125-
const sendRenderedMessage = async (sendArgs: {
142+
const sendRenderedMessageWithThreadFallback = async (sendArgs: {
126143
renderedText: string;
127144
renderedParseMode: "HTML" | undefined;
145+
fallbackWarnMessage: string;
128146
}) => {
129147
const sendParams = sendArgs.renderedParseMode
130148
? {
131149
...replyParams,
132150
parse_mode: sendArgs.renderedParseMode,
133151
}
134152
: replyParams;
135-
return await params.api.sendMessage(chatId, sendArgs.renderedText, sendParams);
153+
const usedThreadParams = hasNumericMessageThreadId(sendParams);
154+
try {
155+
return {
156+
sent: await params.api.sendMessage(chatId, sendArgs.renderedText, sendParams),
157+
usedThreadParams,
158+
};
159+
} catch (err) {
160+
if (!usedThreadParams || !THREAD_NOT_FOUND_RE.test(String(err))) {
161+
throw err;
162+
}
163+
if (!shouldAllowTelegramThreadlessFallback(params.thread)) {
164+
throw err;
165+
}
166+
const threadlessParams: TelegramSendMessageParams = { ...sendParams };
167+
delete threadlessParams.message_thread_id;
168+
params.warn?.(sendArgs.fallbackWarnMessage);
169+
return {
170+
sent: await params.api.sendMessage(
171+
chatId,
172+
sendArgs.renderedText,
173+
Object.keys(threadlessParams).length > 0 ? threadlessParams : undefined,
174+
),
175+
usedThreadParams: false,
176+
};
177+
}
136178
};
137179
const sendMessageTransportPreview = async ({
138180
renderedText,
@@ -151,12 +193,14 @@ export function createTelegramDraftStream(params: {
151193
return true;
152194
}
153195
messageSendAttempted = true;
154-
let sent: Awaited<ReturnType<typeof sendRenderedMessage>>;
196+
let sent: Awaited<ReturnType<typeof sendRenderedMessageWithThreadFallback>>["sent"];
155197
try {
156-
sent = await sendRenderedMessage({
198+
({ sent } = await sendRenderedMessageWithThreadFallback({
157199
renderedText,
158200
renderedParseMode,
159-
});
201+
fallbackWarnMessage:
202+
"telegram stream preview send failed with message_thread_id, retrying without thread",
203+
}));
160204
} catch (err) {
161205
if (isSafeToRetrySendError(err) || isTelegramClientRejection(err)) {
162206
messageSendAttempted = false;

0 commit comments

Comments
 (0)