Skip to content

Commit c49d909

Browse files
committed
fix(slack): persist inbound delivery dedupe
1 parent f0b43bf commit c49d909

14 files changed

Lines changed: 375 additions & 8 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ Docs: https://docs.openclaw.ai
4444

4545
- CLI: enforce the documented Node.js 22.19 runtime floor in the source launcher.
4646
- Agents/replies: persist queued follow-up user messages and assistant error stubs only once across model-fallback retries, preventing repeated provider rejections from corrupted same-role session transcripts. Fixes #83404. (#83417) Thanks @yetval.
47+
- Slack: persist delivered inbound message IDs and fail closed when same-channel thread replies lose their thread context, preventing delayed duplicate replies and accidental channel-root posts. Fixes #83521. Thanks @shannon0430.
4748
- Gateway/config: keep config writes from failing on unrelated unresolved auth-profile SecretRefs while preserving live auth-profile runtime snapshots.
4849
- Gateway/sessions: clear stored CLI provider resume bindings on non-subagent `/reset` so the next turn starts a fresh provider-side CLI conversation instead of resuming old context. (#83448) Thanks @jasonyliu.
4950
- Discord/OpenAI: keep realtime Discord voice sessions hearing follow-up turns with OpenAI realtime and prebuffer assistant playback to avoid choppy starts. (#80505) Thanks @Solvely-Colin.

extensions/slack/src/action-runtime.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,41 @@ describe("handleSlackAction", () => {
148148
expectLastSlackSend("Second", params.cfg);
149149
}
150150

151+
it("fails closed for same-channel sends from thread-required contexts with no thread ts", async () => {
152+
const cfg = slackConfig();
153+
sendSlackMessage.mockClear();
154+
155+
await expect(
156+
handleSlackAction(
157+
{ action: "sendMessage", to: "channel:C123", content: "keep private" },
158+
cfg,
159+
{
160+
currentChannelId: "C123",
161+
replyToMode: "all",
162+
sameChannelThreadRequired: true,
163+
},
164+
),
165+
).rejects.toThrow("Slack thread context is required");
166+
expect(sendSlackMessage).not.toHaveBeenCalled();
167+
});
168+
169+
it("allows explicit top-level sends from thread-required contexts", async () => {
170+
const cfg = slackConfig();
171+
sendSlackMessage.mockClear();
172+
173+
await handleSlackAction(
174+
{ action: "sendMessage", to: "channel:C123", content: "root", topLevel: true },
175+
cfg,
176+
{
177+
currentChannelId: "C123",
178+
replyToMode: "all",
179+
sameChannelThreadRequired: true,
180+
},
181+
);
182+
183+
expectLastSlackSend("root", cfg);
184+
});
185+
151186
async function resolveReadToken(cfg: OpenClawConfig): Promise<string | undefined> {
152187
readSlackMessages.mockClear();
153188
readSlackMessages.mockResolvedValueOnce({ messages: [], hasMore: false });

extensions/slack/src/action-runtime.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ export type SlackActionContext = {
9393
replyToMode?: "off" | "first" | "all" | "batched";
9494
/** Mutable ref to track if a reply was sent for single-use reply modes. */
9595
hasRepliedRef?: { value: boolean };
96+
/** True when same-channel root posting would leak a thread-originated reply. */
97+
sameChannelThreadRequired?: boolean;
9698
/** Allowed local media directories for file uploads. */
9799
mediaLocalRoots?: readonly string[];
98100
mediaReadFile?: (filePath: string) => Promise<Buffer>;
@@ -117,15 +119,22 @@ function resolveThreadTsFromContext(
117119
if (opts?.suppressImplicitThread) {
118120
return undefined;
119121
}
120-
// No context or missing required fields
121-
if (!context?.currentThreadTs || !context?.currentChannelId) {
122+
if (!context?.currentChannelId) {
122123
return undefined;
123124
}
124125

125126
// Different channel - don't inject
126127
if (!sameSlackChannelTarget(targetChannel, context.currentChannelId)) {
127128
return undefined;
128129
}
130+
if (!context.currentThreadTs) {
131+
if (context.sameChannelThreadRequired) {
132+
throw new Error(
133+
"Slack thread context is required for same-channel replies from a threaded Slack turn. Set topLevel=true or threadId=null to post at the channel root.",
134+
);
135+
}
136+
return undefined;
137+
}
129138

130139
// Check replyToMode
131140
if (context.replyToMode === "all") {

extensions/slack/src/action-threading.test.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ type SlackThreadingToolContext = {
66
currentThreadTs?: string;
77
replyToMode?: "off" | "first" | "all" | "batched";
88
hasRepliedRef?: { value: boolean };
9+
sameChannelThreadRequired?: boolean;
910
};
1011

1112
function createToolContext(
@@ -79,4 +80,16 @@ describe("resolveSlackAutoThreadId", () => {
7980
}),
8081
).toBeUndefined();
8182
});
83+
84+
it("fails closed for same-channel threaded replies when the thread timestamp is missing", () => {
85+
expect(() =>
86+
resolveSlackAutoThreadId({
87+
to: "C123",
88+
toolContext: createToolContext({
89+
currentThreadTs: undefined,
90+
sameChannelThreadRequired: true,
91+
}),
92+
}),
93+
).toThrow("Slack thread context is required");
94+
});
8295
});

extensions/slack/src/action-threading.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,11 @@ export function resolveSlackAutoThreadId(params: {
99
currentThreadTs?: string;
1010
replyToMode?: "off" | "first" | "all" | "batched";
1111
hasRepliedRef?: { value: boolean };
12+
sameChannelThreadRequired?: boolean;
1213
};
1314
}): string | undefined {
1415
const context = params.toolContext;
15-
if (!context?.currentThreadTs || !context.currentChannelId) {
16-
return undefined;
17-
}
18-
if (context.replyToMode !== "all" && !isSingleUseReplyToMode(context.replyToMode ?? "off")) {
16+
if (!context?.currentChannelId) {
1917
return undefined;
2018
}
2119
const parsedTarget = parseSlackTarget(params.to, { defaultKind: "channel" });
@@ -28,6 +26,17 @@ export function resolveSlackAutoThreadId(params: {
2826
) {
2927
return undefined;
3028
}
29+
if (!context.currentThreadTs) {
30+
if (context.sameChannelThreadRequired) {
31+
throw new Error(
32+
"Slack thread context is required for same-channel replies from a threaded Slack turn. Set topLevel=true or threadId=null to post at the channel root.",
33+
);
34+
}
35+
return undefined;
36+
}
37+
if (context.replyToMode !== "all" && !isSingleUseReplyToMode(context.replyToMode ?? "off")) {
38+
return undefined;
39+
}
3140
if (isSingleUseReplyToMode(context.replyToMode ?? "off") && context.hasRepliedRef?.value) {
3241
return undefined;
3342
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { afterEach, describe, expect, it, vi } from "vitest";
2+
import { clearSlackRuntime, setSlackRuntime } from "../runtime.js";
3+
import type { SlackMessageEvent } from "../types.js";
4+
import {
5+
clearSlackInboundDeliveryStateForTest,
6+
hasSlackInboundMessageDelivery,
7+
recordSlackInboundMessageDeliveries,
8+
} from "./inbound-delivery-state.js";
9+
10+
describe("slack inbound delivery state", () => {
11+
afterEach(() => {
12+
clearSlackInboundDeliveryStateForTest();
13+
clearSlackRuntime();
14+
vi.restoreAllMocks();
15+
});
16+
17+
function message(channel: string, ts: string): SlackMessageEvent {
18+
return { type: "message", channel, ts, text: "hello" };
19+
}
20+
21+
it("records every delivered debounced source message", async () => {
22+
const register = vi.fn().mockResolvedValue(undefined);
23+
setSlackRuntime({
24+
state: {
25+
openKeyedStore: vi.fn(() => ({
26+
register,
27+
lookup: vi.fn(),
28+
consume: vi.fn(),
29+
delete: vi.fn(),
30+
entries: vi.fn(),
31+
clear: vi.fn(),
32+
})),
33+
},
34+
logging: { getChildLogger: () => ({ warn: vi.fn() }) },
35+
} as never);
36+
37+
await recordSlackInboundMessageDeliveries({
38+
accountId: "A1",
39+
messages: [message("C1", "100.001"), message("C1", "100.002")],
40+
});
41+
42+
expect(register).toHaveBeenCalledTimes(2);
43+
expect(register).toHaveBeenCalledWith("A1:C1:100.001", {
44+
deliveredAt: expect.any(Number),
45+
});
46+
expect(register).toHaveBeenCalledWith("A1:C1:100.002", {
47+
deliveredAt: expect.any(Number),
48+
});
49+
});
50+
51+
it("scopes duplicate checks by account", async () => {
52+
await recordSlackInboundMessageDeliveries({
53+
accountId: "A1",
54+
messages: [message("C1", "100.001")],
55+
});
56+
57+
await expect(
58+
hasSlackInboundMessageDelivery({
59+
accountId: "A1",
60+
channelId: "C1",
61+
ts: "100.001",
62+
}),
63+
).resolves.toBe(true);
64+
await expect(
65+
hasSlackInboundMessageDelivery({
66+
accountId: "A2",
67+
channelId: "C1",
68+
ts: "100.001",
69+
}),
70+
).resolves.toBe(false);
71+
});
72+
});
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
2+
import { getOptionalSlackRuntime } from "../runtime.js";
3+
import type { SlackMessageEvent } from "../types.js";
4+
5+
const TTL_MS = 24 * 60 * 60 * 1000;
6+
const MAX_ENTRIES = 20_000;
7+
const PERSISTENT_MAX_ENTRIES = 20_000;
8+
const PERSISTENT_NAMESPACE = "slack.inbound-deliveries";
9+
const SLACK_INBOUND_DELIVERIES_KEY = Symbol.for("openclaw.slackInboundDeliveries");
10+
11+
type SlackInboundDeliveryRecord = {
12+
deliveredAt: number;
13+
};
14+
15+
type SlackInboundDeliveryStore = {
16+
register(
17+
key: string,
18+
value: SlackInboundDeliveryRecord,
19+
opts?: { ttlMs?: number },
20+
): Promise<void>;
21+
lookup(key: string): Promise<SlackInboundDeliveryRecord | undefined>;
22+
};
23+
24+
const deliveredMessages = resolveGlobalDedupeCache(SLACK_INBOUND_DELIVERIES_KEY, {
25+
ttlMs: TTL_MS,
26+
maxSize: MAX_ENTRIES,
27+
});
28+
29+
let persistentStore: SlackInboundDeliveryStore | undefined;
30+
let persistentStoreDisabled = false;
31+
32+
function makeKey(accountId: string, channelId: string, ts: string): string {
33+
return `${accountId}:${channelId}:${ts}`;
34+
}
35+
36+
function reportPersistentInboundDeliveryError(error: unknown): void {
37+
try {
38+
getOptionalSlackRuntime()
39+
?.logging.getChildLogger({ plugin: "slack", feature: "inbound-delivery-state" })
40+
.warn("Slack persistent inbound delivery state failed", { error: String(error) });
41+
} catch {
42+
// Best effort only: persistent state must never break Slack message handling.
43+
}
44+
}
45+
46+
function disablePersistentInboundDelivery(error: unknown): void {
47+
persistentStoreDisabled = true;
48+
persistentStore = undefined;
49+
reportPersistentInboundDeliveryError(error);
50+
}
51+
52+
function getPersistentInboundDeliveryStore(): SlackInboundDeliveryStore | undefined {
53+
if (persistentStoreDisabled) {
54+
return undefined;
55+
}
56+
if (persistentStore) {
57+
return persistentStore;
58+
}
59+
const runtime = getOptionalSlackRuntime();
60+
if (!runtime) {
61+
return undefined;
62+
}
63+
try {
64+
persistentStore = runtime.state.openKeyedStore<SlackInboundDeliveryRecord>({
65+
namespace: PERSISTENT_NAMESPACE,
66+
maxEntries: PERSISTENT_MAX_ENTRIES,
67+
defaultTtlMs: TTL_MS,
68+
});
69+
return persistentStore;
70+
} catch (error) {
71+
disablePersistentInboundDelivery(error);
72+
return undefined;
73+
}
74+
}
75+
76+
async function lookupPersistentInboundDelivery(key: string): Promise<boolean> {
77+
const store = getPersistentInboundDeliveryStore();
78+
if (!store) {
79+
return false;
80+
}
81+
try {
82+
return Boolean(await store.lookup(key));
83+
} catch (error) {
84+
disablePersistentInboundDelivery(error);
85+
return false;
86+
}
87+
}
88+
89+
async function rememberPersistentInboundDelivery(key: string, deliveredAt: number): Promise<void> {
90+
const store = getPersistentInboundDeliveryStore();
91+
if (!store) {
92+
return;
93+
}
94+
try {
95+
await store.register(key, { deliveredAt });
96+
} catch (error) {
97+
disablePersistentInboundDelivery(error);
98+
}
99+
}
100+
101+
export async function hasSlackInboundMessageDelivery(params: {
102+
accountId: string;
103+
channelId: string | undefined;
104+
ts: string | undefined;
105+
}): Promise<boolean> {
106+
if (!params.accountId || !params.channelId || !params.ts) {
107+
return false;
108+
}
109+
const key = makeKey(params.accountId, params.channelId, params.ts);
110+
if (deliveredMessages.peek(key)) {
111+
return true;
112+
}
113+
const found = await lookupPersistentInboundDelivery(key);
114+
if (found) {
115+
deliveredMessages.check(key);
116+
}
117+
return found;
118+
}
119+
120+
export async function recordSlackInboundMessageDeliveries(params: {
121+
accountId: string;
122+
messages: readonly SlackMessageEvent[];
123+
}): Promise<void> {
124+
if (!params.accountId || params.messages.length === 0) {
125+
return;
126+
}
127+
const deliveredAt = Date.now();
128+
const keys = new Set<string>();
129+
for (const message of params.messages) {
130+
if (!message.channel || !message.ts) {
131+
continue;
132+
}
133+
keys.add(makeKey(params.accountId, message.channel, message.ts));
134+
}
135+
if (keys.size === 0) {
136+
return;
137+
}
138+
for (const key of keys) {
139+
deliveredMessages.check(key, deliveredAt);
140+
}
141+
await Promise.all(Array.from(keys, (key) => rememberPersistentInboundDelivery(key, deliveredAt)));
142+
}
143+
144+
export function clearSlackInboundDeliveryStateForTest(): void {
145+
deliveredMessages.clear();
146+
persistentStore = undefined;
147+
persistentStoreDisabled = false;
148+
}

0 commit comments

Comments
 (0)