Skip to content

Commit 9264fd0

Browse files
committed
fix(bluebubbles): preserve attachment update dedupe
1 parent 95a1c91 commit 9264fd0

4 files changed

Lines changed: 141 additions & 45 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ Docs: https://docs.openclaw.ai
152152
- fix(active-memory): require admin scope for global toggles [AI]. (#78863) Thanks @pgondhi987.
153153
- Honor owner enforcement for native commands [AI]. (#78864) Thanks @pgondhi987.
154154
- Config/BlueBubbles: remove the duplicate core-owned BlueBubbles config schema while preserving plugin-owned `dmPolicy` allowFrom validation for channel and account configs. Fixes #69238. Thanks @omarshahine.
155+
- BlueBubbles: dedupe no-attachment `updated-message` replay noise without dropping late attachment-bearing updates for the same message. (#75526) Thanks @zqchris.
155156
- Tavily: resolve dedicated `tavily_search` and `tavily_extract` tool credentials from the active runtime config snapshot, so `exec` SecretRef-backed API keys do not reach the tools unresolved. (#78610) Thanks @VACInc.
156157
- Gateway/sessions: clear cached skills snapshots during `/new` and `sessions.reset` so long-lived channel sessions rebuild the visible skill list after skills change. (#78873) Thanks @Evizero.
157158
- fix(auto-reply): gate inline skill tool dispatch [AI]. (#78517) Thanks @pgondhi987.

extensions/bluebubbles/src/inbound-dedupe.test.ts

Lines changed: 84 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,21 @@ import {
66
resolveBlueBubblesInboundDedupeKey,
77
} from "./inbound-dedupe.js";
88

9-
async function claimAndFinalize(guid: string | undefined, accountId: string): Promise<string> {
10-
const claim = await claimBlueBubblesInboundMessage({ guid, accountId });
9+
type TestMessage = Parameters<typeof claimBlueBubblesInboundMessage>[0]["message"];
10+
11+
function newMessage(messageId: string | undefined): TestMessage {
12+
return { messageId, eventType: "new-message" };
13+
}
14+
15+
function updatedMessage(
16+
messageId: string | undefined,
17+
attachments: TestMessage["attachments"] = [],
18+
): TestMessage {
19+
return { messageId, eventType: "updated-message", attachments };
20+
}
21+
22+
async function claimAndFinalize(message: TestMessage, accountId: string): Promise<string> {
23+
const claim = await claimBlueBubblesInboundMessage({ message, accountId });
1124
if (claim.kind === "claimed") {
1225
await claim.finalize();
1326
}
@@ -20,42 +33,85 @@ describe("claimBlueBubblesInboundMessage", () => {
2033
});
2134

2235
it("claims a new guid and rejects committed duplicates", async () => {
23-
expect(await claimAndFinalize("g1", "acc")).toBe("claimed");
24-
expect(await claimAndFinalize("g1", "acc")).toBe("duplicate");
36+
expect(await claimAndFinalize(newMessage("g1"), "acc")).toBe("claimed");
37+
expect(await claimAndFinalize(newMessage("g1"), "acc")).toBe("duplicate");
2538
});
2639

2740
it("scopes dedupe per account", async () => {
28-
expect(await claimAndFinalize("g1", "a")).toBe("claimed");
29-
expect(await claimAndFinalize("g1", "b")).toBe("claimed");
41+
expect(await claimAndFinalize(newMessage("g1"), "a")).toBe("claimed");
42+
expect(await claimAndFinalize(newMessage("g1"), "b")).toBe("claimed");
3043
});
3144

3245
it("reports skip when guid is missing or blank", async () => {
33-
expect((await claimBlueBubblesInboundMessage({ guid: undefined, accountId: "acc" })).kind).toBe(
34-
"skip",
35-
);
36-
expect((await claimBlueBubblesInboundMessage({ guid: "", accountId: "acc" })).kind).toBe(
37-
"skip",
38-
);
39-
expect((await claimBlueBubblesInboundMessage({ guid: " ", accountId: "acc" })).kind).toBe(
40-
"skip",
41-
);
46+
expect(
47+
(await claimBlueBubblesInboundMessage({ message: newMessage(undefined), accountId: "acc" }))
48+
.kind,
49+
).toBe("skip");
50+
expect(
51+
(await claimBlueBubblesInboundMessage({ message: newMessage(""), accountId: "acc" })).kind,
52+
).toBe("skip");
53+
expect(
54+
(await claimBlueBubblesInboundMessage({ message: newMessage(" "), accountId: "acc" })).kind,
55+
).toBe("skip");
4256
});
4357

4458
it("rejects overlong guids to cap on-disk size", async () => {
4559
const huge = "x".repeat(10_000);
46-
expect((await claimBlueBubblesInboundMessage({ guid: huge, accountId: "acc" })).kind).toBe(
47-
"skip",
48-
);
60+
expect(
61+
(await claimBlueBubblesInboundMessage({ message: newMessage(huge), accountId: "acc" })).kind,
62+
).toBe("skip");
4963
});
5064

5165
it("releases the claim so a later replay can retry after a transient failure", async () => {
52-
const first = await claimBlueBubblesInboundMessage({ guid: "g1", accountId: "acc" });
66+
const first = await claimBlueBubblesInboundMessage({
67+
message: newMessage("g1"),
68+
accountId: "acc",
69+
});
5370
expect(first.kind).toBe("claimed");
5471
if (first.kind === "claimed") {
5572
first.release();
5673
}
5774
// Released claims should be re-claimable on the next delivery.
58-
expect(await claimAndFinalize("g1", "acc")).toBe("claimed");
75+
expect(await claimAndFinalize(newMessage("g1"), "acc")).toBe("claimed");
76+
});
77+
78+
it("treats no-attachment updated-message follow-ups as duplicates once the base GUID committed", async () => {
79+
// Original new-message: agent processes and replies, base GUID gets committed.
80+
expect(await claimAndFinalize(newMessage("g1"), "acc")).toBe("claimed");
81+
// Follow-up updated-message with no attachments for the same GUID: even
82+
// though `g1:updated` has never been claimed, the base commit is enough to
83+
// recognize replay noise so it cannot re-trigger a reply (especially after
84+
// losing group chat context).
85+
expect(await claimAndFinalize(updatedMessage("g1"), "acc")).toBe("duplicate");
86+
});
87+
88+
it("preserves late attachment-bearing updated-message processing after the base committed", async () => {
89+
// Attachment indexing can arrive after the initial text-only event; this
90+
// path must stay claimable even when the new-message base GUID committed.
91+
expect(await claimAndFinalize(newMessage("g1"), "acc")).toBe("claimed");
92+
expect(
93+
await claimAndFinalize(
94+
updatedMessage("g1", [{ guid: "att-1", mimeType: "image/png" }]),
95+
"acc",
96+
),
97+
).toBe("claimed");
98+
expect(
99+
await claimAndFinalize(
100+
updatedMessage("g1", [{ guid: "att-1", mimeType: "image/png" }]),
101+
"acc",
102+
),
103+
).toBe("duplicate");
104+
});
105+
106+
it("lets an updated-message-first webhook through when the base GUID was never committed", async () => {
107+
// Rare case: BlueBubbles delivers only the updated-message webhook (e.g.
108+
// attachment-only path with no preceding new-message). Without a prior
109+
// base commit, the suffixed key proceeds normally so the agent still sees
110+
// the message.
111+
expect(await claimAndFinalize(updatedMessage("g1"), "acc")).toBe("claimed");
112+
// A subsequent updated-message with the same GUID is a duplicate via the
113+
// standard `:updated` key dedupe.
114+
expect(await claimAndFinalize(updatedMessage("g1"), "acc")).toBe("duplicate");
59115
});
60116
});
61117

@@ -66,17 +122,17 @@ describe("commitBlueBubblesCoalescedMessageIds", () => {
66122

67123
it("marks every coalesced source messageId as seen so a later replay dedupes", async () => {
68124
// Primary was processed via claim+finalize by the debouncer flush.
69-
expect(await claimAndFinalize("primary", "acc")).toBe("claimed");
125+
expect(await claimAndFinalize(newMessage("primary"), "acc")).toBe("claimed");
70126
// Secondaries reach dedupe through the bulk-commit path.
71127
await commitBlueBubblesCoalescedMessageIds({
72128
messageIds: ["secondary-1", "secondary-2"],
73129
accountId: "acc",
74130
});
75131
// A MessagePoller replay of any individual source event is now a duplicate
76132
// rather than a fresh agent turn — the core bug this helper exists to fix.
77-
expect(await claimAndFinalize("primary", "acc")).toBe("duplicate");
78-
expect(await claimAndFinalize("secondary-1", "acc")).toBe("duplicate");
79-
expect(await claimAndFinalize("secondary-2", "acc")).toBe("duplicate");
133+
expect(await claimAndFinalize(newMessage("primary"), "acc")).toBe("duplicate");
134+
expect(await claimAndFinalize(newMessage("secondary-1"), "acc")).toBe("duplicate");
135+
expect(await claimAndFinalize(newMessage("secondary-2"), "acc")).toBe("duplicate");
80136
});
81137

82138
it("scopes coalesced commits per account", async () => {
@@ -85,18 +141,18 @@ describe("commitBlueBubblesCoalescedMessageIds", () => {
85141
accountId: "a",
86142
});
87143
// Same messageId under a different account is still claimable.
88-
expect(await claimAndFinalize("g1", "a")).toBe("duplicate");
89-
expect(await claimAndFinalize("g1", "b")).toBe("claimed");
144+
expect(await claimAndFinalize(newMessage("g1"), "a")).toBe("duplicate");
145+
expect(await claimAndFinalize(newMessage("g1"), "b")).toBe("claimed");
90146
});
91147

92148
it("skips empty or overlong guids without throwing", async () => {
93149
await commitBlueBubblesCoalescedMessageIds({
94150
messageIds: ["", " ", "x".repeat(10_000), "valid"],
95151
accountId: "acc",
96152
});
97-
expect(await claimAndFinalize("valid", "acc")).toBe("duplicate");
153+
expect(await claimAndFinalize(newMessage("valid"), "acc")).toBe("duplicate");
98154
// Overlong guid was skipped by sanitization, not committed.
99-
expect(await claimAndFinalize("x".repeat(10_000), "acc")).toBe("skip");
155+
expect(await claimAndFinalize(newMessage("x".repeat(10_000)), "acc")).toBe("skip");
100156
});
101157
});
102158

extensions/bluebubbles/src/inbound-dedupe.ts

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,27 @@ function sanitizeGuid(guid: string | undefined | null): string | null {
113113
return trimmed;
114114
}
115115

116+
type DedupeMessageInput = Pick<
117+
NormalizedWebhookMessage,
118+
"messageId" | "balloonBundleId" | "associatedMessageGuid" | "eventType" | "attachments"
119+
>;
120+
121+
/**
122+
* Resolve the underlying "base" dedupe key for a BlueBubbles inbound message,
123+
* without the `:updated` suffix that distinguishes follow-up webhooks. Used
124+
* both by `resolveBlueBubblesInboundDedupeKey` (which appends the suffix when
125+
* appropriate) and the `updated-message` short-circuit inside
126+
* `claimBlueBubblesInboundMessage`.
127+
*/
128+
function resolveBaseDedupeKey(message: DedupeMessageInput): string | undefined {
129+
const balloonBundleId = message.balloonBundleId?.trim();
130+
const associatedMessageGuid = message.associatedMessageGuid?.trim();
131+
if (balloonBundleId && associatedMessageGuid) {
132+
return associatedMessageGuid;
133+
}
134+
return message.messageId?.trim() || undefined;
135+
}
136+
116137
/**
117138
* Resolve the canonical dedupe key for a BlueBubbles inbound message.
118139
*
@@ -134,25 +155,19 @@ function sanitizeGuid(guid: string | undefined | null): string | null {
134155
* reply against the same parent GUID and silently drop real messages.
135156
*/
136157
export function resolveBlueBubblesInboundDedupeKey(
137-
message: Pick<
138-
NormalizedWebhookMessage,
139-
"messageId" | "balloonBundleId" | "associatedMessageGuid" | "eventType"
140-
>,
158+
message: DedupeMessageInput,
141159
): string | undefined {
142-
const balloonBundleId = message.balloonBundleId?.trim();
143-
const associatedMessageGuid = message.associatedMessageGuid?.trim();
144-
let base: string | undefined;
145-
if (balloonBundleId && associatedMessageGuid) {
146-
base = associatedMessageGuid;
147-
} else {
148-
base = message.messageId?.trim() || undefined;
149-
}
160+
const base = resolveBaseDedupeKey(message);
150161
if (!base) {
151162
return undefined;
152163
}
153164
// `updated-message` events get a distinct key so they are not rejected as
154165
// duplicates of the already-committed `new-message` for the same GUID.
155-
// This lets attachment-carrying follow-up webhooks through. (#65430, #52277)
166+
// This lets attachment-carrying follow-up webhooks through when the original
167+
// text-only event was processed without media (#65430, #52277). The
168+
// no-attachment `updated-message` short-circuit inside
169+
// `claimBlueBubblesInboundMessage` re-collapses the suffixed key onto the
170+
// base only for replay noise — see that helper for the reasoning.
156171
if (message.eventType === "updated-message") {
157172
return `${base}:updated`;
158173
}
@@ -166,7 +181,7 @@ type InboundDedupeClaim =
166181
| { kind: "skip" };
167182

168183
/**
169-
* Attempt to claim an inbound BlueBubbles message GUID.
184+
* Attempt to claim an inbound BlueBubbles message for processing.
170185
*
171186
* - `claimed`: caller should process the message, then call `finalize()` on
172187
* success (persists the GUID) or `release()` on failure (lets a later
@@ -176,16 +191,40 @@ type InboundDedupeClaim =
176191
* rather than race.
177192
* - `skip`: GUID was missing or invalid — caller should continue processing
178193
* without dedup (no finalize/release needed).
194+
*
195+
* For no-attachment `updated-message` events specifically: when the
196+
* underlying base GUID has already been committed (the `new-message` was
197+
* processed and the agent replied), the follow-up is recognized as a duplicate
198+
* even though the `:updated`-suffixed dedupe key has never been seen. This
199+
* stops replay-only follow-ups from re-triggering a reply, especially when the
200+
* follow-up payload arrives stripped of group chat context and would otherwise
201+
* route into the sender's DM session. Attachment-bearing updates still keep the
202+
* suffixed key so late media indexing remains processable.
179203
*/
180204
export async function claimBlueBubblesInboundMessage(params: {
181-
guid: string | undefined | null;
205+
message: DedupeMessageInput;
182206
accountId: string;
183207
onDiskError?: (error: unknown) => void;
184208
}): Promise<InboundDedupeClaim> {
185-
const normalized = sanitizeGuid(params.guid);
209+
const dedupeKey = resolveBlueBubblesInboundDedupeKey(params.message);
210+
const normalized = sanitizeGuid(dedupeKey);
186211
if (!normalized) {
187212
return { kind: "skip" };
188213
}
214+
const hasAttachments = (params.message.attachments?.length ?? 0) > 0;
215+
if (params.message.eventType === "updated-message" && !hasAttachments) {
216+
const baseKey = resolveBaseDedupeKey(params.message);
217+
const baseSanitized = baseKey ? sanitizeGuid(baseKey) : null;
218+
if (baseSanitized && baseSanitized !== normalized) {
219+
const baseAlreadyCommitted = await impl.hasRecent(baseSanitized, {
220+
namespace: params.accountId,
221+
onDiskError: params.onDiskError,
222+
});
223+
if (baseAlreadyCommitted) {
224+
return { kind: "duplicate" };
225+
}
226+
}
227+
}
189228
const claim = await impl.claim(normalized, {
190229
namespace: params.accountId,
191230
onDiskError: params.onDiskError,

extensions/bluebubbles/src/monitor-processing.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,7 @@ export async function processMessage(
685685

686686
// Drop BlueBubbles MessagePoller replays after server restart (#19176, #12053).
687687
const claim = await claimBlueBubblesInboundMessage({
688-
guid: dedupeKey,
688+
message,
689689
accountId: account.accountId,
690690
onDiskError: (error) =>
691691
logVerbose(core, runtime, `inbound-dedupe disk error: ${sanitizeForLog(error)}`),

0 commit comments

Comments
 (0)