Skip to content

Commit 6c045c5

Browse files
authored
fix(imessage): surface inbound startup diagnostics (#91785)
Merged via squash. Prepared head SHA: 597684c Proof: - Focused tests, lint/type/diff checks, and autoreview passed before merge. - ClawSweeper re-review marked proof and patch quality platinum after lobster live monitor proof. - Maintainer accepted the diagnostics-only default-log privacy/noise tradeoff. Lobster proof id: openclaw-lobster-live-monitor-proof-ada22165-6306-46b6-8ed0-6c94fcab6bbc Reviewed-by: @omarshahine
1 parent bfccbc3 commit 6c045c5

2 files changed

Lines changed: 186 additions & 7 deletions

File tree

extensions/imessage/src/monitor.watch-subscribe-retry.test.ts

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ import { afterAll, afterEach, beforeEach, describe, expect, it, vi } from "vites
44
import type { createIMessageRpcClient, IMessageRpcClient } from "./client.js";
55
import { monitorIMessageProvider } from "./monitor.js";
66
import type { attachIMessageMonitorAbortHandler } from "./monitor/abort-handler.js";
7+
import {
8+
describeIMessageInboundDropDiagnostic,
9+
shouldThrottleIMessageInboundDropDiagnostic,
10+
} from "./monitor/monitor-provider.js";
711

812
const waitForTransportReadyMock = vi.hoisted(() =>
913
vi.fn<typeof waitForTransportReady>(async () => {}),
@@ -110,9 +114,16 @@ describe("monitorIMessageProvider watch.subscribe startup retry", () => {
110114
{ timeoutMs: 10_000 },
111115
);
112116
expect(runtime.log).toHaveBeenCalledTimes(1);
113-
expect(String(runtime.log.mock.calls[0]?.[0])).toContain(
114-
"imessage: watch.subscribe startup failed (attempt 1/3): Error: imsg rpc timeout (watch.subscribe); retrying",
115-
);
117+
const retryLog = String(runtime.log.mock.calls[0]?.[0]);
118+
expect(retryLog).toContain("imessage: watch.subscribe startup failed attempt=1/3");
119+
expect(retryLog).toContain("account=default");
120+
expect(retryLog).toContain("cliPath=imsg");
121+
expect(retryLog).toContain("dbPath=default");
122+
expect(retryLog).toContain("timeoutMs=10000");
123+
expect(retryLog).toContain("since_rowid=none");
124+
expect(retryLog).toContain("attachments=false");
125+
expect(retryLog).toContain("retry_in_ms=1000");
126+
expect(retryLog).toContain("Error: imsg rpc timeout (watch.subscribe)");
116127
expect(
117128
runtime.error.mock.calls.some(([message]) =>
118129
String(message).includes("imessage: monitor failed"),
@@ -142,8 +153,69 @@ describe("monitorIMessageProvider watch.subscribe startup retry", () => {
142153
expect((monitorError as Error).message).toContain("imsg rpc timeout (watch.subscribe)");
143154
expect(createIMessageRpcClientMock).toHaveBeenCalledTimes(3);
144155
expect(runtime.error).toHaveBeenCalledTimes(1);
145-
expect(String(runtime.error.mock.calls[0]?.[0])).toContain(
146-
"imessage: monitor failed: Error: imsg rpc timeout (watch.subscribe)",
156+
const failureLog = String(runtime.error.mock.calls[0]?.[0]);
157+
expect(failureLog).toContain(
158+
"imessage: monitor failed: imessage: watch.subscribe startup failed attempt=3/3",
159+
);
160+
expect(failureLog).toContain("account=default");
161+
expect(failureLog).toContain("timeoutMs=10000");
162+
expect(failureLog).toContain("Error: imsg rpc timeout (watch.subscribe)");
163+
});
164+
});
165+
166+
describe("describeIMessageInboundDropDiagnostic", () => {
167+
it("describes echo-style drops without message content or sender handles", () => {
168+
const diagnostic = describeIMessageInboundDropDiagnostic({
169+
accountId: "default",
170+
reason: "echo",
171+
message: {
172+
id: 42,
173+
chat_id: 123,
174+
guid: "p:0/secret-guid",
175+
is_group: false,
176+
created_at: "2026-06-09T10:00:00.000Z",
177+
},
178+
});
179+
180+
expect(diagnostic).toBe(
181+
'imessage: dropped inbound message account=default reason="echo" chat_id=123 group=false message_id=42 guid=present created_at=2026-06-09T10:00:00.000Z',
147182
);
183+
expect(diagnostic).not.toContain("secret-guid");
184+
expect(diagnostic).not.toContain("+1555");
185+
});
186+
187+
it("describes from-me drops and marks them for throttling", () => {
188+
const diagnostic = describeIMessageInboundDropDiagnostic({
189+
accountId: "default",
190+
reason: "from me",
191+
message: {
192+
id: 43,
193+
chat_id: 456,
194+
guid: "p:0/outbound-guid",
195+
is_group: true,
196+
created_at: "2026-06-09T10:01:00.000Z",
197+
},
198+
});
199+
200+
expect(diagnostic).toBe(
201+
'imessage: dropped inbound message account=default reason="from me" chat_id=456 group=true message_id=43 guid=present created_at=2026-06-09T10:01:00.000Z',
202+
);
203+
expect(diagnostic).not.toContain("outbound-guid");
204+
expect(shouldThrottleIMessageInboundDropDiagnostic("from me")).toBe(true);
205+
expect(shouldThrottleIMessageInboundDropDiagnostic("echo")).toBe(false);
206+
});
207+
208+
it("keeps normal policy drops quiet", () => {
209+
expect(
210+
describeIMessageInboundDropDiagnostic({
211+
accountId: "default",
212+
reason: "no mention",
213+
message: {
214+
id: 42,
215+
chat_id: 123,
216+
is_group: true,
217+
},
218+
}),
219+
).toBeNull();
148220
});
149221
});

extensions/imessage/src/monitor/monitor-provider.ts

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,68 @@ function isRetriableWatchSubscribeStartupError(error: unknown): boolean {
257257
);
258258
}
259259

260+
const IMESSAGE_DIAGNOSTIC_DROP_REASONS = new Set([
261+
"agent echo in self-chat",
262+
"echo",
263+
"from me",
264+
"reflected assistant content",
265+
"self-chat echo",
266+
]);
267+
const IMESSAGE_THROTTLED_DIAGNOSTIC_DROP_REASONS = new Set(["from me"]);
268+
269+
export function shouldThrottleIMessageInboundDropDiagnostic(reason: string): boolean {
270+
return IMESSAGE_THROTTLED_DIAGNOSTIC_DROP_REASONS.has(reason);
271+
}
272+
273+
export function describeIMessageInboundDropDiagnostic(params: {
274+
accountId: string;
275+
reason: string;
276+
message: Pick<IMessagePayload, "chat_id" | "created_at" | "guid" | "id" | "is_group">;
277+
}): string | null {
278+
if (!IMESSAGE_DIAGNOSTIC_DROP_REASONS.has(params.reason)) {
279+
return null;
280+
}
281+
const messageId =
282+
typeof params.message.id === "number" || typeof params.message.id === "string"
283+
? String(params.message.id)
284+
: "unknown";
285+
return (
286+
`imessage: dropped inbound message account=${params.accountId} reason=${JSON.stringify(
287+
params.reason,
288+
)} ` +
289+
`chat_id=${params.message.chat_id ?? "unknown"} group=${params.message.is_group === true} ` +
290+
`message_id=${messageId} guid=${params.message.guid ? "present" : "missing"} ` +
291+
`created_at=${params.message.created_at ?? "unknown"}`
292+
);
293+
}
294+
295+
function describeIMessageWatchSubscribeStartupFailure(params: {
296+
accountId: string;
297+
attempt: number;
298+
maxAttempts: number;
299+
cliPath: string;
300+
dbPath?: string;
301+
remoteHost?: string;
302+
includeAttachments: boolean;
303+
probeTimeoutMs: number;
304+
watchSinceRowid: number | null;
305+
error: unknown;
306+
retryDelayMs?: number;
307+
}): string {
308+
const retry = params.retryDelayMs !== undefined ? ` retry_in_ms=${params.retryDelayMs}` : "";
309+
return (
310+
`imessage: watch.subscribe startup failed attempt=${params.attempt}/${params.maxAttempts} ` +
311+
`account=${params.accountId} cliPath=${params.cliPath} ` +
312+
`dbPath=${params.dbPath ? "configured" : "default"} remoteHost=${
313+
params.remoteHost ? "configured" : "none"
314+
} ` +
315+
`timeoutMs=${params.probeTimeoutMs} since_rowid=${params.watchSinceRowid ?? "none"} ` +
316+
`attachments=${params.includeAttachments} include_reactions=true${retry}: ${String(
317+
params.error,
318+
)}`
319+
);
320+
}
321+
260322
async function waitForWatchSubscribeRetryDelay(params: {
261323
ms: number;
262324
abortSignal?: AbortSignal;
@@ -363,6 +425,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
363425
// replay aggressively without the old catchup cursor/retry bookkeeping.
364426
const inboundReplayGuard = createIMessageInboundReplayGuard();
365427
let staleBacklogSuppressed = 0;
428+
const loggedThrottledDropDiagnostics = new Set<string>();
366429

367430
// Downtime recovery. We pass the persisted recovery cursor (the last
368431
// dispatched rowid) to watch.subscribe as since_rowid so imsg replays the rows
@@ -849,6 +912,23 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
849912
if (isLoopDrop) {
850913
loopRateLimiter.record(rateLimitKey);
851914
}
915+
const diagnostic = describeIMessageInboundDropDiagnostic({
916+
accountId: accountInfo.accountId,
917+
reason: decision.reason,
918+
message,
919+
});
920+
if (diagnostic) {
921+
const throttleKey = `${rateLimitKey}:${decision.reason}`;
922+
const shouldThrottleDiagnostic = shouldThrottleIMessageInboundDropDiagnostic(
923+
decision.reason,
924+
);
925+
if (!shouldThrottleDiagnostic || !loggedThrottledDropDiagnostics.has(throttleKey)) {
926+
if (shouldThrottleDiagnostic) {
927+
loggedThrottledDropDiagnostics.add(throttleKey);
928+
}
929+
runtime.log?.(warn(diagnostic));
930+
}
931+
}
852932
// Surface the silent-allowlist drop once per chat. Without this, operators
853933
// who set groupPolicy="allowlist" without populating
854934
// channels.imessage.groups see every group message vanish at default log
@@ -1422,12 +1502,39 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
14221502
const shouldRetry =
14231503
attempt < WATCH_SUBSCRIBE_MAX_ATTEMPTS && isRetriableWatchSubscribeStartupError(err);
14241504
if (!shouldRetry) {
1425-
runtime.error?.(danger(`imessage: monitor failed: ${String(err)}`));
1505+
runtime.error?.(
1506+
danger(
1507+
`imessage: monitor failed: ${describeIMessageWatchSubscribeStartupFailure({
1508+
accountId: accountInfo.accountId,
1509+
attempt,
1510+
maxAttempts: WATCH_SUBSCRIBE_MAX_ATTEMPTS,
1511+
cliPath,
1512+
dbPath,
1513+
remoteHost,
1514+
includeAttachments,
1515+
probeTimeoutMs,
1516+
watchSinceRowid,
1517+
error: err,
1518+
})}`,
1519+
),
1520+
);
14261521
throw err;
14271522
}
14281523
runtime.log?.(
14291524
warn(
1430-
`imessage: watch.subscribe startup failed (attempt ${attempt}/${WATCH_SUBSCRIBE_MAX_ATTEMPTS}): ${String(err)}; retrying`,
1525+
describeIMessageWatchSubscribeStartupFailure({
1526+
accountId: accountInfo.accountId,
1527+
attempt,
1528+
maxAttempts: WATCH_SUBSCRIBE_MAX_ATTEMPTS,
1529+
cliPath,
1530+
dbPath,
1531+
remoteHost,
1532+
includeAttachments,
1533+
probeTimeoutMs,
1534+
watchSinceRowid,
1535+
error: err,
1536+
retryDelayMs: WATCH_SUBSCRIBE_RETRY_DELAY_MS,
1537+
}),
14311538
),
14321539
);
14331540
// Tear down the failed client before waiting so a slow subscribe attempt

0 commit comments

Comments
 (0)