Skip to content

Commit f46bd2e

Browse files
committed
refactor(feishu): split monitor startup and transport concerns
1 parent c0bf42f commit f46bd2e

10 files changed

Lines changed: 943 additions & 761 deletions

extensions/feishu/src/async.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
const RACE_TIMEOUT = Symbol("race-timeout");
2+
const RACE_ABORT = Symbol("race-abort");
3+
4+
export type RaceWithTimeoutAndAbortResult<T> =
5+
| { status: "resolved"; value: T }
6+
| { status: "timeout" }
7+
| { status: "aborted" };
8+
9+
export async function raceWithTimeoutAndAbort<T>(
10+
promise: Promise<T>,
11+
options: {
12+
timeoutMs?: number;
13+
abortSignal?: AbortSignal;
14+
} = {},
15+
): Promise<RaceWithTimeoutAndAbortResult<T>> {
16+
if (options.abortSignal?.aborted) {
17+
return { status: "aborted" };
18+
}
19+
20+
if (options.timeoutMs === undefined && !options.abortSignal) {
21+
return { status: "resolved", value: await promise };
22+
}
23+
24+
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
25+
let abortHandler: (() => void) | undefined;
26+
const contenders: Array<Promise<T | typeof RACE_TIMEOUT | typeof RACE_ABORT>> = [promise];
27+
28+
if (options.timeoutMs !== undefined) {
29+
contenders.push(
30+
new Promise((resolve) => {
31+
timeoutHandle = setTimeout(() => resolve(RACE_TIMEOUT), options.timeoutMs);
32+
}),
33+
);
34+
}
35+
36+
if (options.abortSignal) {
37+
contenders.push(
38+
new Promise((resolve) => {
39+
abortHandler = () => resolve(RACE_ABORT);
40+
options.abortSignal?.addEventListener("abort", abortHandler, { once: true });
41+
}),
42+
);
43+
}
44+
45+
try {
46+
const result = await Promise.race(contenders);
47+
if (result === RACE_TIMEOUT) {
48+
return { status: "timeout" };
49+
}
50+
if (result === RACE_ABORT) {
51+
return { status: "aborted" };
52+
}
53+
return { status: "resolved", value: result };
54+
} finally {
55+
if (timeoutHandle) {
56+
clearTimeout(timeoutHandle);
57+
}
58+
if (abortHandler) {
59+
options.abortSignal?.removeEventListener("abort", abortHandler);
60+
}
61+
}
62+
}
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
import * as crypto from "crypto";
2+
import * as Lark from "@larksuiteoapi/node-sdk";
3+
import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "openclaw/plugin-sdk";
4+
import { resolveFeishuAccount } from "./accounts.js";
5+
import { raceWithTimeoutAndAbort } from "./async.js";
6+
import { handleFeishuMessage, type FeishuMessageEvent, type FeishuBotAddedEvent } from "./bot.js";
7+
import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js";
8+
import { createEventDispatcher } from "./client.js";
9+
import { fetchBotOpenIdForMonitor } from "./monitor.startup.js";
10+
import { botOpenIds } from "./monitor.state.js";
11+
import { monitorWebhook, monitorWebSocket } from "./monitor.transport.js";
12+
import { getMessageFeishu } from "./send.js";
13+
import type { ResolvedFeishuAccount } from "./types.js";
14+
15+
const FEISHU_REACTION_VERIFY_TIMEOUT_MS = 1_500;
16+
17+
export type FeishuReactionCreatedEvent = {
18+
message_id: string;
19+
chat_id?: string;
20+
chat_type?: "p2p" | "group";
21+
reaction_type?: { emoji_type?: string };
22+
operator_type?: string;
23+
user_id?: { open_id?: string };
24+
action_time?: string;
25+
};
26+
27+
type ResolveReactionSyntheticEventParams = {
28+
cfg: ClawdbotConfig;
29+
accountId: string;
30+
event: FeishuReactionCreatedEvent;
31+
botOpenId?: string;
32+
fetchMessage?: typeof getMessageFeishu;
33+
verificationTimeoutMs?: number;
34+
logger?: (message: string) => void;
35+
uuid?: () => string;
36+
};
37+
38+
export async function resolveReactionSyntheticEvent(
39+
params: ResolveReactionSyntheticEventParams,
40+
): Promise<FeishuMessageEvent | null> {
41+
const {
42+
cfg,
43+
accountId,
44+
event,
45+
botOpenId,
46+
fetchMessage = getMessageFeishu,
47+
verificationTimeoutMs = FEISHU_REACTION_VERIFY_TIMEOUT_MS,
48+
logger,
49+
uuid = () => crypto.randomUUID(),
50+
} = params;
51+
52+
const emoji = event.reaction_type?.emoji_type;
53+
const messageId = event.message_id;
54+
const senderId = event.user_id?.open_id;
55+
if (!emoji || !messageId || !senderId) {
56+
return null;
57+
}
58+
59+
const account = resolveFeishuAccount({ cfg, accountId });
60+
const reactionNotifications = account.config.reactionNotifications ?? "own";
61+
if (reactionNotifications === "off") {
62+
return null;
63+
}
64+
65+
if (event.operator_type === "app" || senderId === botOpenId) {
66+
return null;
67+
}
68+
69+
if (emoji === "Typing") {
70+
return null;
71+
}
72+
73+
if (reactionNotifications === "own" && !botOpenId) {
74+
logger?.(
75+
`feishu[${accountId}]: bot open_id unavailable, skipping reaction ${emoji} on ${messageId}`,
76+
);
77+
return null;
78+
}
79+
80+
const reactedMsg = await raceWithTimeoutAndAbort(fetchMessage({ cfg, messageId, accountId }), {
81+
timeoutMs: verificationTimeoutMs,
82+
})
83+
.then((result) => (result.status === "resolved" ? result.value : null))
84+
.catch(() => null);
85+
const isBotMessage = reactedMsg?.senderType === "app" || reactedMsg?.senderOpenId === botOpenId;
86+
if (!reactedMsg || (reactionNotifications === "own" && !isBotMessage)) {
87+
logger?.(
88+
`feishu[${accountId}]: ignoring reaction on non-bot/unverified message ${messageId} ` +
89+
`(sender: ${reactedMsg?.senderOpenId ?? "unknown"})`,
90+
);
91+
return null;
92+
}
93+
94+
const syntheticChatIdRaw = event.chat_id ?? reactedMsg.chatId;
95+
const syntheticChatId = syntheticChatIdRaw?.trim() ? syntheticChatIdRaw : `p2p:${senderId}`;
96+
const syntheticChatType: "p2p" | "group" = event.chat_type ?? "p2p";
97+
return {
98+
sender: {
99+
sender_id: { open_id: senderId },
100+
sender_type: "user",
101+
},
102+
message: {
103+
message_id: `${messageId}:reaction:${emoji}:${uuid()}`,
104+
chat_id: syntheticChatId,
105+
chat_type: syntheticChatType,
106+
message_type: "text",
107+
content: JSON.stringify({
108+
text: `[reacted with ${emoji} to message ${messageId}]`,
109+
}),
110+
},
111+
};
112+
}
113+
114+
type RegisterEventHandlersContext = {
115+
cfg: ClawdbotConfig;
116+
accountId: string;
117+
runtime?: RuntimeEnv;
118+
chatHistories: Map<string, HistoryEntry[]>;
119+
fireAndForget?: boolean;
120+
};
121+
122+
function registerEventHandlers(
123+
eventDispatcher: Lark.EventDispatcher,
124+
context: RegisterEventHandlersContext,
125+
): void {
126+
const { cfg, accountId, runtime, chatHistories, fireAndForget } = context;
127+
const log = runtime?.log ?? console.log;
128+
const error = runtime?.error ?? console.error;
129+
130+
eventDispatcher.register({
131+
"im.message.receive_v1": async (data) => {
132+
try {
133+
const event = data as unknown as FeishuMessageEvent;
134+
const promise = handleFeishuMessage({
135+
cfg,
136+
event,
137+
botOpenId: botOpenIds.get(accountId),
138+
runtime,
139+
chatHistories,
140+
accountId,
141+
});
142+
if (fireAndForget) {
143+
promise.catch((err) => {
144+
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
145+
});
146+
} else {
147+
await promise;
148+
}
149+
} catch (err) {
150+
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
151+
}
152+
},
153+
"im.message.message_read_v1": async () => {
154+
// Ignore read receipts
155+
},
156+
"im.chat.member.bot.added_v1": async (data) => {
157+
try {
158+
const event = data as unknown as FeishuBotAddedEvent;
159+
log(`feishu[${accountId}]: bot added to chat ${event.chat_id}`);
160+
} catch (err) {
161+
error(`feishu[${accountId}]: error handling bot added event: ${String(err)}`);
162+
}
163+
},
164+
"im.chat.member.bot.deleted_v1": async (data) => {
165+
try {
166+
const event = data as unknown as { chat_id: string };
167+
log(`feishu[${accountId}]: bot removed from chat ${event.chat_id}`);
168+
} catch (err) {
169+
error(`feishu[${accountId}]: error handling bot removed event: ${String(err)}`);
170+
}
171+
},
172+
"im.message.reaction.created_v1": async (data) => {
173+
const processReaction = async () => {
174+
const event = data as FeishuReactionCreatedEvent;
175+
const myBotId = botOpenIds.get(accountId);
176+
const syntheticEvent = await resolveReactionSyntheticEvent({
177+
cfg,
178+
accountId,
179+
event,
180+
botOpenId: myBotId,
181+
logger: log,
182+
});
183+
if (!syntheticEvent) {
184+
return;
185+
}
186+
const promise = handleFeishuMessage({
187+
cfg,
188+
event: syntheticEvent,
189+
botOpenId: myBotId,
190+
runtime,
191+
chatHistories,
192+
accountId,
193+
});
194+
if (fireAndForget) {
195+
promise.catch((err) => {
196+
error(`feishu[${accountId}]: error handling reaction: ${String(err)}`);
197+
});
198+
return;
199+
}
200+
await promise;
201+
};
202+
203+
if (fireAndForget) {
204+
void processReaction().catch((err) => {
205+
error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`);
206+
});
207+
return;
208+
}
209+
210+
try {
211+
await processReaction();
212+
} catch (err) {
213+
error(`feishu[${accountId}]: error handling reaction event: ${String(err)}`);
214+
}
215+
},
216+
"im.message.reaction.deleted_v1": async () => {
217+
// Ignore reaction removals
218+
},
219+
"card.action.trigger": async (data: unknown) => {
220+
try {
221+
const event = data as unknown as FeishuCardActionEvent;
222+
const promise = handleFeishuCardAction({
223+
cfg,
224+
event,
225+
botOpenId: botOpenIds.get(accountId),
226+
runtime,
227+
accountId,
228+
});
229+
if (fireAndForget) {
230+
promise.catch((err) => {
231+
error(`feishu[${accountId}]: error handling card action: ${String(err)}`);
232+
});
233+
} else {
234+
await promise;
235+
}
236+
} catch (err) {
237+
error(`feishu[${accountId}]: error handling card action: ${String(err)}`);
238+
}
239+
},
240+
});
241+
}
242+
243+
export type BotOpenIdSource = { kind: "prefetched"; botOpenId?: string } | { kind: "fetch" };
244+
245+
export type MonitorSingleAccountParams = {
246+
cfg: ClawdbotConfig;
247+
account: ResolvedFeishuAccount;
248+
runtime?: RuntimeEnv;
249+
abortSignal?: AbortSignal;
250+
botOpenIdSource?: BotOpenIdSource;
251+
};
252+
253+
export async function monitorSingleAccount(params: MonitorSingleAccountParams): Promise<void> {
254+
const { cfg, account, runtime, abortSignal } = params;
255+
const { accountId } = account;
256+
const log = runtime?.log ?? console.log;
257+
258+
const botOpenIdSource = params.botOpenIdSource ?? { kind: "fetch" };
259+
const botOpenId =
260+
botOpenIdSource.kind === "prefetched"
261+
? botOpenIdSource.botOpenId
262+
: await fetchBotOpenIdForMonitor(account, { runtime, abortSignal });
263+
botOpenIds.set(accountId, botOpenId ?? "");
264+
log(`feishu[${accountId}]: bot open_id resolved: ${botOpenId ?? "unknown"}`);
265+
266+
const connectionMode = account.config.connectionMode ?? "websocket";
267+
if (connectionMode === "webhook" && !account.verificationToken?.trim()) {
268+
throw new Error(`Feishu account "${accountId}" webhook mode requires verificationToken`);
269+
}
270+
271+
const eventDispatcher = createEventDispatcher(account);
272+
const chatHistories = new Map<string, HistoryEntry[]>();
273+
274+
registerEventHandlers(eventDispatcher, {
275+
cfg,
276+
accountId,
277+
runtime,
278+
chatHistories,
279+
fireAndForget: true,
280+
});
281+
282+
if (connectionMode === "webhook") {
283+
return monitorWebhook({ account, accountId, runtime, abortSignal, eventDispatcher });
284+
}
285+
return monitorWebSocket({ account, accountId, runtime, abortSignal, eventDispatcher });
286+
}

0 commit comments

Comments
 (0)