Skip to content

Commit 3b5b3c2

Browse files
committed
fix(feishu): route /btw through out-of-band lanes
1 parent 9fd08f9 commit 3b5b3c2

5 files changed

Lines changed: 171 additions & 22 deletions

File tree

extensions/feishu/src/monitor.account.ts

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import { botNames, botOpenIds } from "./monitor.state.js";
2929
import { monitorWebhook, monitorWebSocket } from "./monitor.transport.js";
3030
import { getFeishuRuntime } from "./runtime.js";
3131
import { getMessageFeishu } from "./send.js";
32+
import { getFeishuSequentialKey } from "./sequential-key.js";
33+
import { createSequentialQueue } from "./sequential-queue.js";
3234
import { createFeishuThreadBindingManager } from "./thread-bindings.js";
3335
import type { FeishuChatType, ResolvedFeishuAccount } from "./types.js";
3436

@@ -290,25 +292,6 @@ function parseFeishuCardActionEventPayload(value: unknown): FeishuCardActionEven
290292
};
291293
}
292294

293-
/**
294-
* Per-chat serial queue that ensures messages from the same chat are processed
295-
* in arrival order while allowing different chats to run concurrently.
296-
*/
297-
function createChatQueue() {
298-
const queues = new Map<string, Promise<void>>();
299-
return (chatId: string, task: () => Promise<void>): Promise<void> => {
300-
const prev = queues.get(chatId) ?? Promise.resolve();
301-
const next = prev.then(task, task);
302-
queues.set(chatId, next);
303-
void next.finally(() => {
304-
if (queues.get(chatId) === next) {
305-
queues.delete(chatId);
306-
}
307-
});
308-
return next;
309-
};
310-
}
311-
312295
function mergeFeishuDebounceMentions(
313296
entries: FeishuMessageEvent[],
314297
): FeishuMessageEvent["message"]["mentions"] | undefined {
@@ -395,7 +378,9 @@ function registerEventHandlers(
395378
});
396379
const log = runtime?.log ?? console.log;
397380
const error = runtime?.error ?? console.error;
398-
const enqueue = createChatQueue();
381+
// Keep normal Feishu traffic FIFO per chat while allowing explicit out-of-band
382+
// commands like /btw and /stop to bypass the busy main-chat lane.
383+
const enqueue = createSequentialQueue();
399384
const runFeishuHandler = async (params: { task: () => Promise<void>; errorMessage: string }) => {
400385
if (fireAndForget) {
401386
void params.task().catch((err) => {
@@ -410,7 +395,12 @@ function registerEventHandlers(
410395
}
411396
};
412397
const dispatchFeishuMessage = async (event: FeishuMessageEvent) => {
413-
const chatId = event.message.chat_id?.trim() || "unknown";
398+
const sequentialKey = getFeishuSequentialKey({
399+
accountId,
400+
event,
401+
botOpenId: botOpenIds.get(accountId),
402+
botName: botNames.get(accountId),
403+
});
414404
const task = () =>
415405
handleFeishuMessage({
416406
cfg,
@@ -422,7 +412,7 @@ function registerEventHandlers(
422412
accountId,
423413
processingClaimHeld: true,
424414
});
425-
await enqueue(chatId, task);
415+
await enqueue(sequentialKey, task);
426416
};
427417
const resolveSenderDebounceId = (event: FeishuMessageEvent): string | undefined => {
428418
const senderId =
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { describe, expect, it } from "vitest";
2+
import type { FeishuMessageEvent } from "./bot.js";
3+
import { getFeishuSequentialKey } from "./sequential-key.js";
4+
5+
function createTextEvent(params: {
6+
text: string;
7+
messageId?: string;
8+
chatId?: string;
9+
}): FeishuMessageEvent {
10+
return {
11+
sender: {
12+
sender_id: {
13+
open_id: "ou_sender_1",
14+
user_id: "ou_user_1",
15+
},
16+
sender_type: "user",
17+
},
18+
message: {
19+
message_id: params.messageId ?? "om_message_1",
20+
chat_id: params.chatId ?? "oc_dm_chat",
21+
chat_type: "p2p",
22+
message_type: "text",
23+
content: JSON.stringify({ text: params.text }),
24+
},
25+
} as FeishuMessageEvent;
26+
}
27+
28+
describe("getFeishuSequentialKey", () => {
29+
it.each([
30+
[createTextEvent({ text: "hello" }), "feishu:default:oc_dm_chat"],
31+
[createTextEvent({ text: "/status" }), "feishu:default:oc_dm_chat"],
32+
[createTextEvent({ text: "/stop" }), "feishu:default:oc_dm_chat:control"],
33+
[createTextEvent({ text: "/btw what changed?" }), "feishu:default:oc_dm_chat:btw:om_message_1"],
34+
])("resolves sequential key %#", (event, expected) => {
35+
expect(
36+
getFeishuSequentialKey({
37+
accountId: "default",
38+
event,
39+
}),
40+
).toBe(expected);
41+
});
42+
43+
it("falls back to a stable btw lane when the message id is unavailable", () => {
44+
const event = createTextEvent({ text: "/btw what changed?" });
45+
delete (event.message as { message_id?: string }).message_id;
46+
47+
expect(
48+
getFeishuSequentialKey({
49+
accountId: "default",
50+
event,
51+
}),
52+
).toBe("feishu:default:oc_dm_chat:btw");
53+
});
54+
});
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import { isAbortRequestText, isBtwRequestText } from "openclaw/plugin-sdk/reply-runtime";
2+
import { parseFeishuMessageEvent, type FeishuMessageEvent } from "./bot.js";
3+
4+
export function getFeishuSequentialKey(params: {
5+
accountId: string;
6+
event: FeishuMessageEvent;
7+
botOpenId?: string;
8+
botName?: string;
9+
}): string {
10+
const { accountId, event, botOpenId, botName } = params;
11+
const chatId = event.message.chat_id?.trim() || "unknown";
12+
const baseKey = `feishu:${accountId}:${chatId}`;
13+
const parsed = parseFeishuMessageEvent(event, botOpenId, botName);
14+
const text = parsed.content.trim();
15+
16+
if (isAbortRequestText(text)) {
17+
return `${baseKey}:control`;
18+
}
19+
20+
if (isBtwRequestText(text)) {
21+
const messageId = event.message.message_id?.trim();
22+
return messageId ? `${baseKey}:btw:${messageId}` : `${baseKey}:btw`;
23+
}
24+
25+
return baseKey;
26+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { describe, expect, it } from "vitest";
2+
import { createSequentialQueue } from "./sequential-queue.js";
3+
4+
function createDeferred() {
5+
let resolve!: () => void;
6+
const promise = new Promise<void>((res) => {
7+
resolve = res;
8+
});
9+
return { promise, resolve };
10+
}
11+
12+
describe("createSequentialQueue", () => {
13+
it("serializes tasks for the same key", async () => {
14+
const enqueue = createSequentialQueue();
15+
const gate = createDeferred();
16+
const order: string[] = [];
17+
18+
const first = enqueue("feishu:default:chat-1", async () => {
19+
order.push("first:start");
20+
await gate.promise;
21+
order.push("first:end");
22+
});
23+
const second = enqueue("feishu:default:chat-1", async () => {
24+
order.push("second:start");
25+
order.push("second:end");
26+
});
27+
28+
await Promise.resolve();
29+
expect(order).toEqual(["first:start"]);
30+
31+
gate.resolve();
32+
await Promise.all([first, second]);
33+
34+
expect(order).toEqual(["first:start", "first:end", "second:start", "second:end"]);
35+
});
36+
37+
it("allows different keys to run concurrently", async () => {
38+
const enqueue = createSequentialQueue();
39+
const gateA = createDeferred();
40+
const gateB = createDeferred();
41+
const order: string[] = [];
42+
43+
const first = enqueue("feishu:default:chat-1", async () => {
44+
order.push("chat-1:start");
45+
await gateA.promise;
46+
order.push("chat-1:end");
47+
});
48+
const second = enqueue("feishu:default:chat-1:btw:om_2", async () => {
49+
order.push("btw:start");
50+
await gateB.promise;
51+
order.push("btw:end");
52+
});
53+
54+
await Promise.resolve();
55+
expect(order).toEqual(["chat-1:start", "btw:start"]);
56+
57+
gateA.resolve();
58+
gateB.resolve();
59+
await Promise.all([first, second]);
60+
61+
expect(order).toContain("chat-1:end");
62+
expect(order).toContain("btw:end");
63+
});
64+
});
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
export function createSequentialQueue() {
2+
const queues = new Map<string, Promise<void>>();
3+
4+
return (key: string, task: () => Promise<void>): Promise<void> => {
5+
const previous = queues.get(key) ?? Promise.resolve();
6+
const next = previous.then(task, task);
7+
queues.set(key, next);
8+
void next.finally(() => {
9+
if (queues.get(key) === next) {
10+
queues.delete(key);
11+
}
12+
});
13+
return next;
14+
};
15+
}

0 commit comments

Comments
 (0)