Skip to content

Commit 56cc150

Browse files
obviyussteipete
authored andcommitted
feat(telegram): route ambient chatter as room events
1 parent 503c3d1 commit 56cc150

7 files changed

Lines changed: 152 additions & 9 deletions

extensions/telegram/src/bot-message-context.body.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ export type TelegramInboundBodyResult = {
6969
effectiveWasMentioned: boolean;
7070
canDetectMention: boolean;
7171
shouldBypassMention: boolean;
72+
hasControlCommand: boolean;
7273
audioTranscribedMediaIndex?: number;
7374
stickerCacheHit: boolean;
7475
locationData?: NormalizedLocation;
@@ -341,7 +342,7 @@ export async function resolveTelegramInboundBody(params: {
341342
canDetectMention,
342343
wasMentioned,
343344
hasAnyMention,
344-
implicitMentionKinds: isGroup && Boolean(requireMention) ? implicitMentionKinds : [],
345+
implicitMentionKinds: isGroup ? implicitMentionKinds : [],
345346
},
346347
policy: {
347348
isGroup,
@@ -419,6 +420,7 @@ export async function resolveTelegramInboundBody(params: {
419420
effectiveWasMentioned,
420421
canDetectMention,
421422
shouldBypassMention: mentionDecision.shouldBypassMention,
423+
hasControlCommand: hasControlCommandInMessage,
422424
...(audioTranscribedMediaIndex !== undefined && audioTranscribedMediaIndex >= 0
423425
? { audioTranscribedMediaIndex }
424426
: {}),

extensions/telegram/src/bot-message-context.require-mention.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,60 @@ describe("buildTelegramMessageContext requireMention precedence", () => {
6161
}
6262
});
6363

64+
it("marks always-on ambient group messages as room events", async () => {
65+
const ctx = await buildTelegramMessageContextForTest({
66+
message: buildForumMessage(),
67+
resolveGroupActivation: () => false,
68+
resolveGroupRequireMention: () => false,
69+
resolveTelegramGroupConfig: () => ({
70+
groupConfig: { requireMention: false },
71+
topicConfig: undefined,
72+
}),
73+
});
74+
75+
expect(ctx?.ctxPayload.InboundTurnKind).toBe("room_event");
76+
});
77+
78+
it("keeps room events as context for the next direct group request", async () => {
79+
const groupHistories = new Map();
80+
await buildTelegramMessageContextForTest({
81+
message: { ...buildForumMessage(99), text: "side chatter" },
82+
historyLimit: 10,
83+
groupHistories,
84+
resolveGroupActivation: () => false,
85+
resolveGroupRequireMention: () => false,
86+
resolveTelegramGroupConfig: () => ({
87+
groupConfig: { requireMention: false },
88+
topicConfig: undefined,
89+
}),
90+
});
91+
92+
const ctx = await buildTelegramMessageContextForTest({
93+
message: {
94+
...buildForumMessage(99),
95+
message_id: 2,
96+
text: "replying directly",
97+
reply_to_message: {
98+
message_id: 10,
99+
chat: { id: -1001234567890, type: "supergroup", title: "Forum", is_forum: true },
100+
from: { id: 7, first_name: "Bot", username: "bot", is_bot: true },
101+
text: "previous bot message",
102+
},
103+
},
104+
historyLimit: 10,
105+
groupHistories,
106+
resolveGroupActivation: () => false,
107+
resolveGroupRequireMention: () => false,
108+
resolveTelegramGroupConfig: () => ({
109+
groupConfig: { requireMention: false },
110+
topicConfig: undefined,
111+
}),
112+
});
113+
114+
expect(ctx?.ctxPayload.InboundTurnKind).toBe("user_request");
115+
expect(ctx?.ctxPayload.Body).toContain("side chatter");
116+
});
117+
64118
it("lets explicit topic requireMention=false override mention activation", async () => {
65119
const resolveGroupActivation = vi.fn(() => true);
66120

extensions/telegram/src/bot-message-context.session.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ export async function buildTelegramInboundContextPayload(params: {
175175
topicConfig?: TelegramTopicConfig;
176176
stickerCacheHit: boolean;
177177
effectiveWasMentioned: boolean;
178+
hasControlCommand: boolean;
178179
audioTranscribedMediaIndex?: number;
179180
commandAuthorized: boolean;
180181
locationData?: NormalizedLocation;
@@ -223,6 +224,7 @@ export async function buildTelegramInboundContextPayload(params: {
223224
topicConfig,
224225
stickerCacheHit,
225226
effectiveWasMentioned,
227+
hasControlCommand,
226228
audioTranscribedMediaIndex,
227229
commandAuthorized,
228230
locationData,
@@ -368,9 +370,9 @@ export async function buildTelegramInboundContextPayload(params: {
368370
previousTimestamp,
369371
envelope: envelopeOptions,
370372
});
373+
const channelHistory = createChannelHistoryWindow({ historyMap: groupHistories });
371374
let combinedBody = body;
372375
if (isGroup && historyKey && historyLimit > 0) {
373-
const channelHistory = createChannelHistoryWindow({ historyMap: groupHistories });
374376
combinedBody = channelHistory.buildPendingContext({
375377
historyKey,
376378
limit: historyLimit,
@@ -397,7 +399,7 @@ export async function buildTelegramInboundContextPayload(params: {
397399
});
398400
const inboundHistory =
399401
isGroup && historyKey && historyLimit > 0
400-
? createChannelHistoryWindow({ historyMap: groupHistories }).buildInboundHistory({
402+
? channelHistory.buildInboundHistory({
401403
historyKey,
402404
limit: historyLimit,
403405
})
@@ -411,6 +413,10 @@ export async function buildTelegramInboundContextPayload(params: {
411413
const telegramTo = `telegram:${chatId}`;
412414
const locationContext = locationData ? toLocationContext(locationData) : undefined;
413415
const commandSource = options?.commandSource;
416+
const inboundTurnKind =
417+
isGroup && !effectiveWasMentioned && !hasControlCommand && commandSource !== "native"
418+
? "room_event"
419+
: "user_request";
414420
const ctxPayload = sessionRuntime.buildChannelTurnContext({
415421
channel: "telegram",
416422
accountId: route.accountId,
@@ -447,6 +453,7 @@ export async function buildTelegramInboundContextPayload(params: {
447453
messageThreadId: threadSpec.id,
448454
},
449455
message: {
456+
inboundTurnKind,
450457
body: combinedBody,
451458
rawBody,
452459
bodyForAgent: bodyText,
@@ -538,6 +545,18 @@ export async function buildTelegramInboundContextPayload(params: {
538545
TopicName: isForum && topicName ? topicName : undefined,
539546
},
540547
} satisfies BuildChannelTurnContextParams);
548+
if (inboundTurnKind === "room_event" && historyKey) {
549+
channelHistory.record({
550+
historyKey,
551+
limit: historyLimit,
552+
entry: {
553+
sender: buildSenderLabel(msg, senderId || chatId),
554+
body: rawBody,
555+
timestamp: msg.date ? msg.date * 1000 : undefined,
556+
messageId: typeof msg.message_id === "number" ? String(msg.message_id) : undefined,
557+
},
558+
});
559+
}
541560

542561
const pinnedMainDmOwner = !isGroup
543562
? sessionRuntime.resolvePinnedMainDmOwnerFromAllowlist({

extensions/telegram/src/bot-message-context.test-harness.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ type BuildTelegramMessageContextForTestParams = {
1515
options?: BuildTelegramMessageContextParams["options"];
1616
cfg?: Record<string, unknown>;
1717
accountId?: string;
18+
historyLimit?: number;
19+
groupHistories?: Map<string, import("openclaw/plugin-sdk/reply-history").HistoryEntry[]>;
1820
ackReactionScope?: BuildTelegramMessageContextParams["ackReactionScope"];
1921
botApi?: Record<string, unknown>;
2022
runtime?: BuildTelegramMessageContextParams["runtime"];
@@ -77,8 +79,8 @@ export async function buildTelegramMessageContextForTest(
7779
},
7880
sessionRuntime,
7981
account: { accountId: params.accountId ?? "default" } as never,
80-
historyLimit: 0,
81-
groupHistories: new Map(),
82+
historyLimit: params.historyLimit ?? 0,
83+
groupHistories: params.groupHistories ?? new Map(),
8284
dmPolicy: "open",
8385
allowFrom: ["*"],
8486
groupAllowFrom: [],

extensions/telegram/src/bot-message-context.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,7 @@ export const buildTelegramMessageContext = async ({
603603
topicConfig,
604604
stickerCacheHit: bodyResult.stickerCacheHit,
605605
effectiveWasMentioned: bodyResult.effectiveWasMentioned,
606+
hasControlCommand: bodyResult.hasControlCommand,
606607
...(bodyResult.audioTranscribedMediaIndex !== undefined
607608
? { audioTranscribedMediaIndex: bodyResult.audioTranscribedMediaIndex }
608609
: {}),

extensions/telegram/src/bot-message-dispatch.test.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,6 +1498,59 @@ describe("dispatchTelegramMessage draft streaming", () => {
14981498
expect(sendMessageTelegram).not.toHaveBeenCalled();
14991499
});
15001500

1501+
it("runs ambient room events as tool-only invisible turns", async () => {
1502+
const historyKey = "telegram:group:-100123";
1503+
const groupHistories = new Map([
1504+
[historyKey, [{ sender: "Alice", body: "side chatter", timestamp: 1 }]],
1505+
]);
1506+
dispatchReplyWithBufferedBlockDispatcher.mockResolvedValue({
1507+
queuedFinal: false,
1508+
counts: { block: 0, final: 0, tool: 0 },
1509+
sourceReplyDeliveryMode: "message_tool_only",
1510+
});
1511+
1512+
await dispatchWithContext({
1513+
context: createContext({
1514+
ctxPayload: {
1515+
InboundTurnKind: "room_event",
1516+
SessionKey: "agent:main:telegram:group:-100123",
1517+
ChatType: "group",
1518+
MessageSid: "99",
1519+
RawBody: "ambient",
1520+
BodyForAgent: "ambient",
1521+
CommandBody: "ambient",
1522+
} as unknown as TelegramMessageContext["ctxPayload"],
1523+
msg: {
1524+
chat: { id: -100123, type: "supergroup" },
1525+
message_id: 99,
1526+
} as unknown as TelegramMessageContext["msg"],
1527+
chatId: -100123,
1528+
isGroup: true,
1529+
historyKey,
1530+
historyLimit: 10,
1531+
groupHistories,
1532+
threadSpec: { id: undefined, scope: "none" },
1533+
}),
1534+
streamMode: "partial",
1535+
});
1536+
1537+
const dispatchParams = mockCallArg(dispatchReplyWithBufferedBlockDispatcher) as {
1538+
replyOptions?: {
1539+
sourceReplyDeliveryMode?: string;
1540+
suppressTyping?: boolean;
1541+
allowProgressCallbacksWhenSourceDeliverySuppressed?: boolean;
1542+
};
1543+
};
1544+
expect(dispatchParams.replyOptions?.sourceReplyDeliveryMode).toBe("message_tool_only");
1545+
expect(dispatchParams.replyOptions?.suppressTyping).toBe(true);
1546+
expect(dispatchParams.replyOptions?.allowProgressCallbacksWhenSourceDeliverySuppressed).toBe(
1547+
false,
1548+
);
1549+
expect(createTelegramDraftStream).not.toHaveBeenCalled();
1550+
expect(deliverReplies).not.toHaveBeenCalled();
1551+
expect(groupHistories.get(historyKey)).toHaveLength(1);
1552+
});
1553+
15011554
it("shows compacting reaction during auto-compaction and resumes thinking", async () => {
15021555
const statusReactionController = {
15031556
setThinking: vi.fn(async () => {}),

extensions/telegram/src/bot-message-dispatch.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ export const dispatchTelegramMessage = async ({
480480
const accountBlockStreamingEnabled =
481481
resolveChannelStreamingBlockEnabled(telegramCfg) ??
482482
cfg.agents?.defaults?.blockStreamingDefault === "on";
483+
const isRoomEvent = ctxPayload.InboundTurnKind === "room_event";
483484
const resolvedReasoningLevel = resolveTelegramReasoningLevel({
484485
cfg,
485486
sessionKey: ctxPayload.SessionKey,
@@ -488,7 +489,7 @@ export const dispatchTelegramMessage = async ({
488489
});
489490
const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on";
490491
const streamReasoningDraft = resolvedReasoningLevel === "stream";
491-
const streamDeliveryEnabled = streamMode !== "off";
492+
const streamDeliveryEnabled = !isRoomEvent && streamMode !== "off";
492493
const rawReplyQuoteText =
493494
ctxPayload.ReplyToIsQuote && typeof ctxPayload.ReplyToQuoteText === "string"
494495
? ctxPayload.ReplyToQuoteText
@@ -1162,7 +1163,7 @@ export const dispatchTelegramMessage = async ({
11621163
}
11631164
}
11641165

1165-
if (statusReactionController) {
1166+
if (statusReactionController && !isRoomEvent) {
11661167
void statusReactionController.setThinking();
11671168
}
11681169

@@ -1434,6 +1435,8 @@ export const dispatchTelegramMessage = async ({
14341435
replyOptions: {
14351436
skillFilter,
14361437
disableBlockStreaming,
1438+
sourceReplyDeliveryMode: isRoomEvent ? "message_tool_only" : undefined,
1439+
suppressTyping: isRoomEvent,
14371440
onPartialReply:
14381441
answerLane.stream || reasoningLane.stream
14391442
? (payload) =>
@@ -1473,7 +1476,8 @@ export const dispatchTelegramMessage = async ({
14731476
: undefined,
14741477
suppressDefaultToolProgressMessages:
14751478
!streamDeliveryEnabled || Boolean(answerLane.stream),
1476-
allowProgressCallbacksWhenSourceDeliverySuppressed: Boolean(answerLane.stream),
1479+
allowProgressCallbacksWhenSourceDeliverySuppressed:
1480+
!isRoomEvent && Boolean(answerLane.stream),
14771481
onToolStart: async (payload) => {
14781482
const toolName = payload.name?.trim();
14791483
const progressPromise = pushStreamToolProgress(
@@ -1722,7 +1726,13 @@ export const dispatchTelegramMessage = async ({
17221726
);
17231727
}
17241728

1729+
const shouldClearGroupHistory =
1730+
!isRoomEvent || deliverySummary.delivered || sentFallback || queuedFinal;
1731+
17251732
if (!hasFinalResponse) {
1733+
if (!shouldClearGroupHistory) {
1734+
return;
1735+
}
17261736
clearGroupHistory();
17271737
return;
17281738
}
@@ -1795,5 +1805,7 @@ export const dispatchTelegramMessage = async ({
17951805
},
17961806
});
17971807
}
1798-
clearGroupHistory();
1808+
if (shouldClearGroupHistory) {
1809+
clearGroupHistory();
1810+
}
17991811
};

0 commit comments

Comments
 (0)