Skip to content

Commit ee15ec6

Browse files
zhangguiping-xydt张贵萍
authored andcommitted
fix(imessage): repair anchorless payloads before debounce
Move anchor recovery ahead of live debounce classification so malformed group payloads cannot be coalesced as sender DMs before routing.
1 parent 265803a commit ee15ec6

2 files changed

Lines changed: 130 additions & 8 deletions

File tree

extensions/imessage/src/monitor.watch-subscribe-retry.test.ts

Lines changed: 116 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,15 @@ function createRuntime() {
3232
}
3333

3434
type MockIMessageRpcClient = IMessageRpcClient & {
35-
request: ReturnType<typeof vi.fn<(method: string) => Promise<unknown>>>;
35+
request: ReturnType<
36+
typeof vi.fn<(method: string, params?: Record<string, unknown>) => Promise<unknown>>
37+
>;
3638
waitForClose: ReturnType<typeof vi.fn<() => Promise<void>>>;
3739
stop: ReturnType<typeof vi.fn<() => Promise<void>>>;
3840
};
3941

4042
function createRpcClient(overrides?: {
41-
request?: (method: string) => Promise<unknown>;
43+
request?: (method: string, params?: Record<string, unknown>) => Promise<unknown>;
4244
waitForClose?: () => Promise<void>;
4345
}): MockIMessageRpcClient {
4446
const client = {
@@ -119,6 +121,118 @@ describe("monitorIMessageProvider watch.subscribe startup retry", () => {
119121
).toBe(false);
120122
});
121123

124+
it("repairs anchorless live payloads before debounce classification", async () => {
125+
const runtime = createRuntime();
126+
let notify: ((msg: { method: string; params?: unknown }) => void) | undefined;
127+
let close!: () => void;
128+
const closed = new Promise<void>((resolve) => {
129+
close = resolve;
130+
});
131+
const calls: Array<{ method: string; params?: Record<string, unknown> }> = [];
132+
const client = createRpcClient({
133+
waitForClose: async () => await closed,
134+
request: async (method, params) => {
135+
calls.push({ method, params });
136+
if (method === "watch.subscribe") {
137+
return { subscription: 1 };
138+
}
139+
if (method === "chats.list") {
140+
return { chats: [{ id: 101 }, { id: 202 }] };
141+
}
142+
if (method === "messages.history") {
143+
if (params?.chat_id === 101) {
144+
return {
145+
messages: [
146+
{
147+
guid: "GUID-GROUP-A",
148+
chat_id: 101,
149+
chat_guid: "iMessage;+;group-a",
150+
chat_identifier: "group-a",
151+
is_group: true,
152+
participants: ["+15550001111", "+15550002222"],
153+
},
154+
],
155+
};
156+
}
157+
if (params?.chat_id === 202) {
158+
return {
159+
messages: [
160+
{
161+
guid: "GUID-GROUP-B",
162+
chat_id: 202,
163+
chat_guid: "iMessage;+;group-b",
164+
chat_identifier: "group-b",
165+
is_group: true,
166+
participants: ["+15550001111", "+15550003333"],
167+
},
168+
],
169+
};
170+
}
171+
return { messages: [] };
172+
}
173+
return {};
174+
},
175+
});
176+
177+
createIMessageRpcClientMock.mockImplementation(async (opts) => {
178+
notify = opts?.onNotification;
179+
return client;
180+
});
181+
182+
const monitorPromise = monitorIMessageProvider({
183+
config: {
184+
channels: {
185+
imessage: {
186+
coalesceSameSenderDms: true,
187+
groupPolicy: "open",
188+
groups: { "*": { requireMention: true } },
189+
},
190+
},
191+
messages: { groupChat: { mentionPatterns: ["@openclaw"] } },
192+
} as never,
193+
runtime: runtime as never,
194+
});
195+
196+
await vi.waitFor(() => expect(notify).toBeDefined());
197+
198+
const baseMessage = {
199+
chat_id: 0,
200+
sender: "+15550001111",
201+
is_from_me: false,
202+
attachments: null,
203+
chat_identifier: "",
204+
chat_guid: "",
205+
chat_name: "",
206+
participants: null,
207+
is_group: false,
208+
};
209+
notify?.({
210+
method: "message",
211+
params: { message: { ...baseMessage, guid: "GUID-GROUP-A", text: "first group" } },
212+
});
213+
notify?.({
214+
method: "message",
215+
params: { message: { ...baseMessage, guid: "GUID-GROUP-B", text: "second group" } },
216+
});
217+
218+
await vi.waitFor(() =>
219+
expect(
220+
calls.filter((call) => call.method === "messages.history" && call.params?.chat_id === 202),
221+
).toHaveLength(1),
222+
);
223+
224+
expect(calls.filter((call) => call.method === "chats.list")).toHaveLength(2);
225+
expect(
226+
calls.filter((call) => call.method === "messages.history" && call.params?.chat_id === 101),
227+
).toHaveLength(2);
228+
expect(
229+
calls.filter((call) => call.method === "messages.history" && call.params?.chat_id === 202),
230+
).toHaveLength(1);
231+
232+
close();
233+
await monitorPromise;
234+
});
235+
122236
it("still fails after bounded startup retries are exhausted", async () => {
123237
const runtime = createRuntime();
124238
createIMessageRpcClientMock.mockImplementation(async () =>

extensions/imessage/src/monitor/monitor-provider.ts

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,14 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
413413
}
414414
}
415415

416+
async function repairMessageConversationAnchor(message: IMessagePayload) {
417+
return await repairIMessageConversationAnchor({
418+
message,
419+
client: getActiveClient(),
420+
runtime,
421+
});
422+
}
423+
416424
async function handleMessageNow(
417425
message: IMessagePayload,
418426
options: { advanceCatchupCursor?: boolean } = {},
@@ -465,11 +473,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
465473
// (e.g. group link-preview with chat_id=0 and empty chat_guid/
466474
// chat_identifier). Fail-closed: if recovery cannot determine the real
467475
// conversation, the message is dropped rather than routed to sender DM.
468-
const repaired = await repairIMessageConversationAnchor({
469-
message,
470-
client: getActiveClient(),
471-
runtime,
472-
});
476+
const repaired = await repairMessageConversationAnchor(message);
473477
if (!repaired) {
474478
return;
475479
}
@@ -865,7 +869,11 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
865869
runtime.error?.(`imessage: dropping malformed RPC message payload (keys=${shape})`);
866870
return;
867871
}
868-
await inboundDebouncer.enqueue({ message });
872+
const repaired = await repairMessageConversationAnchor(message);
873+
if (!repaired) {
874+
return;
875+
}
876+
await inboundDebouncer.enqueue({ message: repaired });
869877
};
870878

871879
await waitForTransportReady({

0 commit comments

Comments
 (0)