Skip to content

Commit 7918bb1

Browse files
committed
slack: persist thread participation best-effort
1 parent d3bb5ce commit 7918bb1

6 files changed

Lines changed: 197 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai
1212
- macOS app: move recent session context rows into a Context submenu while keeping usage and cost details root-level, so the menu bar companion stays compact with many active sessions. Thanks @guti.
1313
- Gateway/SDK: add SDK-facing tools.invoke RPC with shared HTTP policy, typed approval/refusal results, and SDK helper support. Refs #74705. Thanks @BunsDev and @ai-hpc.
1414
- Messages/docs: clarify that `BodyForAgent` is the primary inbound model text while `Body` is the legacy envelope fallback, and add Signal coverage so channel hardening patches target the real prompt path. Refs #66198. Thanks @defonota3box.
15+
- Slack/plugins: persist bot-participated thread markers with best-effort SDK state behind the in-memory cache, so thread auto-reply context can survive restarts while SQLite failures fall back to process-local behavior. Thanks @amknight.
1516
- Control UI/Usage: add UTC quarter-hour token buckets for the Usage Mosaic and reuse them for hour filtering, keeping the legacy session-span fallback for older summaries. (#74337) Thanks @konanok.
1617
- BlueBubbles: add opt-in `channels.bluebubbles.replyContextApiFallback` that fetches the original message from the BlueBubbles HTTP API when the in-memory reply-context cache misses (multi-instance deployments sharing one BB account, post-restart, after long-lived TTL/LRU eviction). Off by default; channel-level setting propagates to accounts that omit the flag through `mergeAccountConfig`; routed through the typed `BlueBubblesClient` so every fetch is SSRF-guarded by the same three-mode policy as every other BB client request; reply-id shape is validated and part-index prefixes (`p:0/<guid>`) are stripped before the request; concurrent webhooks for the same `replyToId` coalesce into one fetch and successful responses populate the reply cache for subsequent hits. Also promotes BlueBubbles attachment download failures from verbose to runtime error so silently-dropped inbound images are visible at default log level, and extends `sanitizeForLog` to redact `?password=…`/`?token=…` query params and `Authorization:` headers before they reach the log sink (CWE-532). (#71820) Thanks @coletebou and @zqchris.
1718
- CLI/proxy: add `openclaw proxy validate` so operators can verify effective proxy configuration, proxy reachability, and expected allow/deny destination behavior before deploying proxy-routed OpenClaw commands. (#73438) Thanks @jesse-merhi.

docs/channels/slack.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,7 @@ Current Slack message actions include `send`, `upload-file`, `download-file`, `r
596596
- `channels.slack.thread.historyScope` default is `thread`; `thread.inheritParent` default is `false`.
597597
- `channels.slack.thread.initialHistoryLimit` controls how many existing thread messages are fetched when a new thread session starts (default `20`; set `0` to disable).
598598
- `channels.slack.thread.requireExplicitMention` (default `false`): when `true`, suppress implicit thread mentions so the bot only responds to explicit `@bot` mentions inside threads, even when the bot already participated in the thread. Without this, replies in a bot-participated thread bypass `requireMention` gating.
599+
- Restart-safe thread participation: Slack uses best-effort SDK-backed persistent state behind the in-memory cache for bot-participated thread markers, so thread auto-reply context can survive a Gateway restart. If the persistent store is unavailable or fails, Slack logs the failure and keeps the previous process-local cache behavior.
599600

600601
Reply threading controls:
601602

extensions/slack/src/monitor/message-handler/dispatch.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1170,7 +1170,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
11701170
// or draft stream). Falls back to statusThreadTs for edge cases.
11711171
const participationThreadTs = usedReplyThreadTs ?? statusThreadTs;
11721172
if (anyReplyDelivered && participationThreadTs) {
1173-
recordSlackThreadParticipation(account.accountId, message.channel, participationThreadTs);
1173+
recordSlackThreadParticipation(account.accountId, message.channel, participationThreadTs, {
1174+
agentId: route.agentId,
1175+
});
11741176
}
11751177

11761178
if (!anyReplyDelivered) {

extensions/slack/src/monitor/message-handler/prepare.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import {
3333
import type { ResolvedSlackAccount } from "../../accounts.js";
3434
import { reactSlackMessage } from "../../actions.js";
3535
import { formatSlackFileReference } from "../../file-reference.js";
36-
import { hasSlackThreadParticipation } from "../../sent-thread-cache.js";
36+
import { hasSlackThreadParticipationWithPersistence } from "../../sent-thread-cache.js";
3737
import type { SlackMessageEvent } from "../../types.js";
3838
import {
3939
normalizeAllowListLower,
@@ -361,7 +361,11 @@ export async function prepareSlackMessage(params: {
361361
...implicitMentionKindWhen("reply_to_bot", message.parent_user_id === ctx.botUserId),
362362
...implicitMentionKindWhen(
363363
"bot_thread_participant",
364-
hasSlackThreadParticipation(account.accountId, message.channel, message.thread_ts),
364+
await hasSlackThreadParticipationWithPersistence({
365+
accountId: account.accountId,
366+
channelId: message.channel,
367+
threadTs: message.thread_ts,
368+
}),
365369
),
366370
];
367371

extensions/slack/src/sent-thread-cache.test.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures";
22
import { afterEach, describe, expect, it, vi } from "vitest";
3+
import { clearSlackRuntime, setSlackRuntime } from "./runtime.js";
34
import {
45
clearSlackThreadParticipationCache,
56
hasSlackThreadParticipation,
7+
hasSlackThreadParticipationWithPersistence,
68
recordSlackThreadParticipation,
79
} from "./sent-thread-cache.js";
810

911
describe("slack sent-thread-cache", () => {
1012
afterEach(() => {
1113
clearSlackThreadParticipationCache();
14+
clearSlackRuntime();
1215
vi.restoreAllMocks();
1316
});
1417

@@ -88,4 +91,75 @@ describe("slack sent-thread-cache", () => {
8891
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000000")).toBe(false);
8992
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.005000")).toBe(true);
9093
});
94+
95+
it("writes and reads persistent thread participation when runtime state is available", async () => {
96+
const register = vi.fn().mockResolvedValue(undefined);
97+
const lookup = vi.fn().mockResolvedValue({ repliedAt: 123 });
98+
const openKeyedStore = vi.fn(() => ({
99+
register,
100+
lookup,
101+
consume: vi.fn(),
102+
delete: vi.fn(),
103+
entries: vi.fn(),
104+
clear: vi.fn(),
105+
}));
106+
setSlackRuntime({
107+
state: { openKeyedStore },
108+
logging: { getChildLogger: () => ({ warn: vi.fn() }) },
109+
} as never);
110+
111+
recordSlackThreadParticipation("A1", "C123", "1700000000.000002");
112+
113+
await vi.waitFor(() => expect(register).toHaveBeenCalledTimes(1));
114+
expect(register).toHaveBeenCalledWith(
115+
"A1:C123:1700000000.000002",
116+
expect.objectContaining({ repliedAt: expect.any(Number) }),
117+
);
118+
119+
clearSlackThreadParticipationCache();
120+
await expect(
121+
hasSlackThreadParticipationWithPersistence({
122+
accountId: "A1",
123+
channelId: "C123",
124+
threadTs: "1700000000.000002",
125+
}),
126+
).resolves.toBe(true);
127+
expect(openKeyedStore).toHaveBeenCalledTimes(2);
128+
expect(lookup).toHaveBeenCalledWith("A1:C123:1700000000.000002");
129+
130+
lookup.mockClear();
131+
await expect(
132+
hasSlackThreadParticipationWithPersistence({
133+
accountId: "A1",
134+
channelId: "C123",
135+
threadTs: "1700000000.000002",
136+
}),
137+
).resolves.toBe(true);
138+
expect(lookup).not.toHaveBeenCalled();
139+
});
140+
141+
it("falls back to in-memory thread participation when persistent state cannot open", async () => {
142+
const warn = vi.fn();
143+
setSlackRuntime({
144+
state: {
145+
openKeyedStore: vi.fn(() => {
146+
throw new Error("sqlite unavailable");
147+
}),
148+
},
149+
logging: { getChildLogger: () => ({ warn }) },
150+
} as never);
151+
152+
recordSlackThreadParticipation("A1", "C123", "1700000000.000003");
153+
expect(hasSlackThreadParticipation("A1", "C123", "1700000000.000003")).toBe(true);
154+
155+
clearSlackThreadParticipationCache();
156+
await expect(
157+
hasSlackThreadParticipationWithPersistence({
158+
accountId: "A1",
159+
channelId: "C123",
160+
threadTs: "1700000000.000003",
161+
}),
162+
).resolves.toBe(false);
163+
expect(warn).toHaveBeenCalled();
164+
});
91165
});

extensions/slack/src/sent-thread-cache.ts

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
2+
import { getOptionalSlackRuntime } from "./runtime.js";
23

34
/**
45
* In-memory cache of Slack threads the bot has participated in.
@@ -8,6 +9,22 @@ import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
89

910
const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours
1011
const MAX_ENTRIES = 5000;
12+
const PERSISTENT_MAX_ENTRIES = 1000;
13+
const PERSISTENT_NAMESPACE = "slack.thread-participation";
14+
15+
type SlackThreadParticipationRecord = {
16+
agentId?: string;
17+
repliedAt: number;
18+
};
19+
20+
type SlackThreadParticipationStore = {
21+
register(
22+
key: string,
23+
value: SlackThreadParticipationRecord,
24+
opts?: { ttlMs?: number },
25+
): Promise<void>;
26+
lookup(key: string): Promise<SlackThreadParticipationRecord | undefined>;
27+
};
1128

1229
/**
1330
* Keep Slack thread participation shared across bundled chunks so thread
@@ -19,19 +36,92 @@ const threadParticipation = resolveGlobalDedupeCache(SLACK_THREAD_PARTICIPATION_
1936
maxSize: MAX_ENTRIES,
2037
});
2138

39+
let persistentStore: SlackThreadParticipationStore | undefined;
40+
let persistentStoreDisabled = false;
41+
2242
function makeKey(accountId: string, channelId: string, threadTs: string): string {
2343
return `${accountId}:${channelId}:${threadTs}`;
2444
}
2545

46+
function reportPersistentThreadParticipationError(error: unknown): void {
47+
try {
48+
getOptionalSlackRuntime()
49+
?.logging.getChildLogger({ plugin: "slack", feature: "thread-participation-state" })
50+
.warn("Slack persistent thread participation state failed", { error: String(error) });
51+
} catch {
52+
// Best effort only: persistent state must never break Slack message handling.
53+
}
54+
}
55+
56+
function disablePersistentThreadParticipation(error: unknown): void {
57+
persistentStoreDisabled = true;
58+
persistentStore = undefined;
59+
reportPersistentThreadParticipationError(error);
60+
}
61+
62+
function getPersistentThreadParticipationStore(): SlackThreadParticipationStore | undefined {
63+
if (persistentStoreDisabled) {
64+
return undefined;
65+
}
66+
if (persistentStore) {
67+
return persistentStore;
68+
}
69+
const runtime = getOptionalSlackRuntime();
70+
if (!runtime) {
71+
return undefined;
72+
}
73+
try {
74+
persistentStore = runtime.state.openKeyedStore<SlackThreadParticipationRecord>({
75+
namespace: PERSISTENT_NAMESPACE,
76+
maxEntries: PERSISTENT_MAX_ENTRIES,
77+
defaultTtlMs: TTL_MS,
78+
});
79+
return persistentStore;
80+
} catch (error) {
81+
disablePersistentThreadParticipation(error);
82+
return undefined;
83+
}
84+
}
85+
86+
function rememberPersistentThreadParticipation(params: { key: string; agentId?: string }): void {
87+
const store = getPersistentThreadParticipationStore();
88+
if (!store) {
89+
return;
90+
}
91+
void store
92+
.register(params.key, {
93+
// Stored for future per-agent thread routing; current reads only need presence.
94+
...(params.agentId ? { agentId: params.agentId } : {}),
95+
repliedAt: Date.now(),
96+
})
97+
.catch(disablePersistentThreadParticipation);
98+
}
99+
100+
async function lookupPersistentThreadParticipation(key: string): Promise<boolean> {
101+
const store = getPersistentThreadParticipationStore();
102+
if (!store) {
103+
return false;
104+
}
105+
try {
106+
return Boolean(await store.lookup(key));
107+
} catch (error) {
108+
disablePersistentThreadParticipation(error);
109+
return false;
110+
}
111+
}
112+
26113
export function recordSlackThreadParticipation(
27114
accountId: string,
28115
channelId: string,
29116
threadTs: string,
117+
opts?: { agentId?: string },
30118
): void {
31119
if (!accountId || !channelId || !threadTs) {
32120
return;
33121
}
34-
threadParticipation.check(makeKey(accountId, channelId, threadTs));
122+
const key = makeKey(accountId, channelId, threadTs);
123+
threadParticipation.check(key);
124+
rememberPersistentThreadParticipation({ key, agentId: opts?.agentId });
35125
}
36126

37127
export function hasSlackThreadParticipation(
@@ -45,6 +135,27 @@ export function hasSlackThreadParticipation(
45135
return threadParticipation.peek(makeKey(accountId, channelId, threadTs));
46136
}
47137

138+
export async function hasSlackThreadParticipationWithPersistence(params: {
139+
accountId: string;
140+
channelId: string;
141+
threadTs: string;
142+
}): Promise<boolean> {
143+
if (!params.accountId || !params.channelId || !params.threadTs) {
144+
return false;
145+
}
146+
const key = makeKey(params.accountId, params.channelId, params.threadTs);
147+
if (threadParticipation.peek(key)) {
148+
return true;
149+
}
150+
const found = await lookupPersistentThreadParticipation(key);
151+
if (found) {
152+
threadParticipation.check(key);
153+
}
154+
return found;
155+
}
156+
48157
export function clearSlackThreadParticipationCache(): void {
49158
threadParticipation.clear();
159+
persistentStore = undefined;
160+
persistentStoreDisabled = false;
50161
}

0 commit comments

Comments
 (0)