Skip to content

Commit 0e262d2

Browse files
authored
fix(discord): fence tool warning fallback delivery (#87465)
* fix(discord): fence recovered tool warning fallback * fix(discord): keep warning fallback after failed final * fix(reply): keep settled cleanup unconditional
1 parent 748510b commit 0e262d2

5 files changed

Lines changed: 207 additions & 24 deletions

File tree

extensions/discord/src/monitor/message-handler.process.test.ts

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,10 @@ vi.mock("openclaw/plugin-sdk/reply-runtime", () => ({
235235
info: { kind: "block" | "final" },
236236
) => Promise<ReplyPayload | null> | ReplyPayload | null;
237237
deliver: (payload: unknown, info: { kind: "block" | "final" }) => Promise<void> | void;
238+
onError?: (err: unknown, info: { kind: "block" | "final" }) => void;
238239
transformReplyPayload?: (payload: ReplyPayload) => ReplyPayload | null;
240+
onSettled?: () => Promise<unknown> | unknown;
241+
onFreshSettledDelivery?: () => Promise<unknown> | unknown;
239242
};
240243
ctx?: unknown;
241244
replyOptions?: DispatchInboundParams["replyOptions"];
@@ -257,21 +260,32 @@ vi.mock("openclaw/plugin-sdk/reply-runtime", () => ({
257260
await params.dispatcherOptions.deliver(deliverPayload, info);
258261
};
259262
const queueDelivery = (payload: ReplyPayload, info: { kind: "block" | "final" }) => {
260-
const delivery = Promise.resolve(deliver(payload, info)).catch(() => undefined);
263+
const delivery = Promise.resolve(deliver(payload, info)).catch((err: unknown) => {
264+
params.dispatcherOptions.onError?.(err, info);
265+
});
261266
pendingDeliveries.push(delivery);
262267
return true;
263268
};
264-
return await dispatchInboundMessage({
265-
ctx: params.ctx,
266-
replyOptions: params.replyOptions,
267-
dispatcher: {
268-
sendBlockReply: vi.fn((payload: ReplyPayload) => queueDelivery(payload, { kind: "block" })),
269-
sendFinalReply: vi.fn((payload: ReplyPayload) => queueDelivery(payload, { kind: "final" })),
270-
waitForIdle: vi.fn(async () => {
271-
await Promise.all(pendingDeliveries);
272-
}),
273-
},
274-
});
269+
try {
270+
return await dispatchInboundMessage({
271+
ctx: params.ctx,
272+
replyOptions: params.replyOptions,
273+
dispatcher: {
274+
sendBlockReply: vi.fn((payload: ReplyPayload) =>
275+
queueDelivery(payload, { kind: "block" }),
276+
),
277+
sendFinalReply: vi.fn((payload: ReplyPayload) =>
278+
queueDelivery(payload, { kind: "final" }),
279+
),
280+
waitForIdle: vi.fn(async () => {
281+
await Promise.all(pendingDeliveries);
282+
}),
283+
},
284+
});
285+
} finally {
286+
await params.dispatcherOptions.onSettled?.();
287+
await params.dispatcherOptions.onFreshSettledDelivery?.();
288+
}
275289
},
276290
dispatchInboundMessage: (params: DispatchInboundParams) => dispatchInboundMessage(params),
277291
settleReplyDispatcher: async (params: {
@@ -2252,6 +2266,7 @@ describe("processDiscordMessage draft streaming", () => {
22522266
const draftStream = createMockDraftStreamForTest();
22532267
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
22542268
await params?.dispatcher.sendFinalReply({ text: "delivery survived" });
2269+
await params?.dispatcher.waitForIdle();
22552270
await params?.dispatcher.sendFinalReply(createNonTerminalToolWarningPayload());
22562271
return { queuedFinal: true, counts: { final: 2, tool: 0, block: 0 } };
22572272
});
@@ -2273,6 +2288,7 @@ describe("processDiscordMessage draft streaming", () => {
22732288
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
22742289
await params?.dispatcher.sendFinalReply(createNonTerminalToolWarningPayload());
22752290
await params?.dispatcher.sendFinalReply({ text: "delivery recovered" });
2291+
await params?.dispatcher.waitForIdle();
22762292
return { queuedFinal: true, counts: { final: 2, tool: 0, block: 0 } };
22772293
});
22782294

@@ -2314,6 +2330,39 @@ describe("processDiscordMessage draft streaming", () => {
23142330
});
23152331
});
23162332

2333+
it("delivers tool warning finals when the recovered reply fails to send", async () => {
2334+
deliverDiscordReply.mockRejectedValueOnce(new Error("send failed"));
2335+
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
2336+
await params?.dispatcher.sendFinalReply({ text: "delivery failed" });
2337+
await params?.dispatcher.waitForIdle();
2338+
await params?.dispatcher.sendFinalReply(createNonTerminalToolWarningPayload());
2339+
return {
2340+
queuedFinal: true,
2341+
counts: { final: 2, tool: 0, block: 0 },
2342+
failedCounts: { final: 1 },
2343+
};
2344+
});
2345+
2346+
const ctx = await createAutomaticSourceDeliveryContext({
2347+
discordConfig: { streamMode: "off" },
2348+
});
2349+
2350+
await runProcessDiscordMessage(ctx);
2351+
2352+
expect(deliverDiscordReply).toHaveBeenCalledTimes(2);
2353+
expect(firstMockArg(deliverDiscordReply, "deliverDiscordReply")).toMatchObject({
2354+
replies: [{ text: "delivery failed" }],
2355+
});
2356+
expect(deliverDiscordReply.mock.calls[1]?.[0]).toMatchObject({
2357+
replies: [
2358+
{
2359+
text: "⚠️ 🛠️ `run openclaw definitely-not-a-real-subcommand (agent)` failed",
2360+
isError: true,
2361+
},
2362+
],
2363+
});
2364+
});
2365+
23172366
it("keeps mutating tool warning finals after successful-looking replies", async () => {
23182367
const draftStream = createMockDraftStreamForTest();
23192368
dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => {
@@ -2500,6 +2549,7 @@ describe("processDiscordMessage draft streaming", () => {
25002549
await params?.replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
25012550
await params?.replyOptions?.onItemEvent?.({ progressText: "exec done" });
25022551
await params?.dispatcher.sendFinalReply({ text: "delivery survived" });
2552+
await params?.dispatcher.waitForIdle();
25032553
await params?.dispatcher.sendFinalReply(createNonTerminalToolWarningPayload());
25042554
return { queuedFinal: true, counts: { final: 2, tool: 0, block: 0 } };
25052555
});

extensions/discord/src/monitor/message-handler.process.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,11 +546,13 @@ export async function processDiscordMessage(
546546
observer?.onFinalReplyStart?.();
547547
};
548548
let userFacingFinalDelivered = false;
549+
let userFacingFinalDeliveryFailed = false;
549550
let pendingToolWarningFinal:
550551
| { payload: ReplyPayload; info: { kind: ReplyDispatchKind } }
551552
| undefined;
552553
const markUserFacingFinalDelivered = () => {
553554
userFacingFinalDelivered = true;
555+
userFacingFinalDeliveryFailed = false;
554556
pendingToolWarningFinal = undefined;
555557
draftPreview.markFinalReplyDelivered();
556558
observer?.onFinalReplyDelivered?.();
@@ -630,7 +632,10 @@ export async function processDiscordMessage(
630632
!options?.allowFallbackOnlyToolWarning &&
631633
isFallbackOnlyToolWarningFinal(payload)
632634
) {
633-
if (!userFacingFinalDelivered) {
635+
if (
636+
!userFacingFinalDelivered &&
637+
(!finalReplyStartNotified || userFacingFinalDeliveryFailed)
638+
) {
634639
pendingToolWarningFinal = { payload, info };
635640
}
636641
return { visibleReplySent: false };
@@ -839,6 +844,9 @@ export async function processDiscordMessage(
839844
return { visibleReplySent: true };
840845
};
841846
const onDiscordDeliveryError = (err: unknown, info: { kind: string }) => {
847+
if (info.kind === "final" && finalReplyStartNotified && !userFacingFinalDelivered) {
848+
userFacingFinalDeliveryFailed = true;
849+
}
842850
runtime.error(
843851
danger(
844852
formatDiscordReplyDeliveryFailure({
@@ -865,17 +873,18 @@ export async function processDiscordMessage(
865873
let dispatchAborted = false;
866874
const deliverPendingToolWarningFinalIfNeeded = async () => {
867875
if (!pendingToolWarningFinal || userFacingFinalDelivered || isProcessAborted(abortSignal)) {
868-
return;
876+
return undefined;
869877
}
870878
const pending = pendingToolWarningFinal;
871879
pendingToolWarningFinal = undefined;
872880
try {
873-
await deliverDiscordPayload(pending.payload, pending.info, {
881+
return await deliverDiscordPayload(pending.payload, pending.info, {
874882
allowFallbackOnlyToolWarning: true,
875883
});
876884
} catch (err) {
877885
dispatchError = true;
878886
onDiscordDeliveryError(err, pending.info);
887+
return { visibleReplySent: false };
879888
}
880889
};
881890
try {
@@ -898,6 +907,7 @@ export async function processDiscordMessage(
898907
humanDelay: resolveHumanDelayConfig(cfg, route.agentId),
899908
beforeDeliver: beforeDiscordPayloadDelivery,
900909
onReplyStart: onDiscordReplyStart,
910+
onFreshSettledDelivery: deliverPendingToolWarningFinalIfNeeded,
901911
},
902912
delivery: {
903913
deliver: deliverDiscordPayload,
@@ -1067,7 +1077,6 @@ export async function processDiscordMessage(
10671077
dispatchAborted = true;
10681078
return;
10691079
}
1070-
await deliverPendingToolWarningFinalIfNeeded();
10711080
} catch (err) {
10721081
if (isProcessAborted(abortSignal)) {
10731082
dispatchAborted = true;

src/auto-reply/dispatch.freshness.test.ts

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ function dispatchWithDeliveries(
6363
dispatcherOptions: {
6464
beforeDeliver?: ReplyDispatchBeforeDeliver;
6565
deliver?: (payload: ReplyPayload, info: { kind: Delivery["kind"] }) => Promise<unknown>;
66-
onSettled?: () => unknown;
66+
onSettled?: () => Promise<unknown> | unknown;
67+
onFreshSettledDelivery?: () => Promise<unknown> | unknown;
6768
} = {},
6869
) {
6970
return dispatchInboundMessageWithBufferedDispatcher({
@@ -476,6 +477,113 @@ describe("foreground reply freshness", () => {
476477
expect(deliveries).toEqual([]);
477478
});
478479

480+
it("still runs stale generic settled hooks after a newer visible reply", async () => {
481+
const deliveries: Delivery[] = [];
482+
const beforeDeliverStarted = createDeferred<void>();
483+
const releaseBeforeDeliver = createDeferred<ReplyPayload | null>();
484+
const beforeDeliver = vi.fn(() => {
485+
beforeDeliverStarted.resolve();
486+
return releaseBeforeDeliver.promise;
487+
});
488+
const olderSettled = vi.fn();
489+
490+
hoisted.dispatchReplyFromConfigMock.mockImplementation(
491+
async (params: DispatchReplyFromConfigParams) => {
492+
if (params.ctx.MessageSid === "old-message") {
493+
params.dispatcher.sendFinalReply({ text: "old final" });
494+
return queuedFinalResult();
495+
}
496+
if (params.ctx.MessageSid === "new-message") {
497+
params.dispatcher.sendFinalReply({ text: "new final" });
498+
return queuedFinalResult();
499+
}
500+
throw new Error(`unexpected test message ${params.ctx.MessageSid ?? "<missing>"}`);
501+
},
502+
);
503+
504+
const olderDispatch = dispatchWithDeliveries(
505+
buildForegroundCtx({ MessageSid: "old-message" }),
506+
deliveries,
507+
{ beforeDeliver, onSettled: olderSettled },
508+
);
509+
await beforeDeliverStarted.promise;
510+
511+
const newerResult = await dispatchWithDeliveries(
512+
buildForegroundCtx({ MessageSid: "new-message" }),
513+
deliveries,
514+
);
515+
516+
releaseBeforeDeliver.resolve({ text: "old rewritten final" });
517+
const olderResult = await olderDispatch;
518+
519+
expect(beforeDeliver).toHaveBeenCalledTimes(1);
520+
expect(olderSettled).toHaveBeenCalledTimes(1);
521+
expect(newerResult).toEqual({
522+
queuedFinal: true,
523+
counts: { tool: 0, block: 0, final: 1 },
524+
});
525+
expect(olderResult).toEqual({
526+
queuedFinal: false,
527+
counts: { tool: 0, block: 0, final: 0 },
528+
});
529+
expect(deliveries).toEqual([{ kind: "final", text: "new final" }]);
530+
});
531+
532+
it("suppresses an older fresh settled delivery after a newer visible reply", async () => {
533+
const deliveries: Delivery[] = [];
534+
const beforeDeliverStarted = createDeferred<void>();
535+
const releaseBeforeDeliver = createDeferred<ReplyPayload | null>();
536+
const beforeDeliver = vi.fn(() => {
537+
beforeDeliverStarted.resolve();
538+
return releaseBeforeDeliver.promise;
539+
});
540+
const olderFreshDelivery = vi.fn(() => {
541+
deliveries.push({ kind: "final", text: "old settled fallback" });
542+
return { visibleReplySent: true };
543+
});
544+
545+
hoisted.dispatchReplyFromConfigMock.mockImplementation(
546+
async (params: DispatchReplyFromConfigParams) => {
547+
if (params.ctx.MessageSid === "old-message") {
548+
params.dispatcher.sendFinalReply({ text: "old final" });
549+
return queuedFinalResult();
550+
}
551+
if (params.ctx.MessageSid === "new-message") {
552+
params.dispatcher.sendFinalReply({ text: "new final" });
553+
return queuedFinalResult();
554+
}
555+
throw new Error(`unexpected test message ${params.ctx.MessageSid ?? "<missing>"}`);
556+
},
557+
);
558+
559+
const olderDispatch = dispatchWithDeliveries(
560+
buildForegroundCtx({ MessageSid: "old-message" }),
561+
deliveries,
562+
{ beforeDeliver, onFreshSettledDelivery: olderFreshDelivery },
563+
);
564+
await beforeDeliverStarted.promise;
565+
566+
const newerResult = await dispatchWithDeliveries(
567+
buildForegroundCtx({ MessageSid: "new-message" }),
568+
deliveries,
569+
);
570+
571+
releaseBeforeDeliver.resolve({ text: "old rewritten final" });
572+
const olderResult = await olderDispatch;
573+
574+
expect(beforeDeliver).toHaveBeenCalledTimes(1);
575+
expect(olderFreshDelivery).not.toHaveBeenCalled();
576+
expect(newerResult).toEqual({
577+
queuedFinal: true,
578+
counts: { tool: 0, block: 0, final: 1 },
579+
});
580+
expect(olderResult).toEqual({
581+
queuedFinal: false,
582+
counts: { tool: 0, block: 0, final: 0 },
583+
});
584+
expect(deliveries).toEqual([{ kind: "final", text: "new final" }]);
585+
});
586+
479587
it("runs the settled delivery hook when dispatch fails after queueing a reply", async () => {
480588
const deliveries: Delivery[] = [];
481589
let settled = false;

src/auto-reply/dispatch.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,15 +213,18 @@ function isVisiblePartialDeliveryError(error: unknown): boolean {
213213
);
214214
}
215215

216-
async function runForegroundReplyFenceSettledDelivery(
216+
async function runForegroundReplyFenceFreshSettledDelivery(
217217
snapshot: ForegroundReplyFenceSnapshot | undefined,
218-
onSettled: (() => unknown) | undefined,
218+
onFreshSettledDelivery: (() => unknown) | undefined,
219219
): Promise<void> {
220-
if (!onSettled) {
220+
if (!onFreshSettledDelivery) {
221+
return;
222+
}
223+
if (await shouldCancelForegroundReplyDelivery(snapshot)) {
221224
return;
222225
}
223226
try {
224-
const deliveryResult = await onSettled();
227+
const deliveryResult = await onFreshSettledDelivery();
225228
if (isExplicitlyVisibleDelivery(deliveryResult)) {
226229
markForegroundReplyFenceVisibleDeliveryGeneration(snapshot);
227230
}
@@ -488,9 +491,13 @@ export async function dispatchInboundMessageWithBufferedDispatcher(params: {
488491
});
489492
} finally {
490493
try {
491-
await runForegroundReplyFenceSettledDelivery(
494+
const settledResult = await params.dispatcherOptions.onSettled?.();
495+
if (isExplicitlyVisibleDelivery(settledResult)) {
496+
markForegroundReplyFenceVisibleDeliveryGeneration(foregroundReplyFence);
497+
}
498+
await runForegroundReplyFenceFreshSettledDelivery(
492499
foregroundReplyFence,
493-
params.dispatcherOptions.onSettled,
500+
params.dispatcherOptions.onFreshSettledDelivery,
494501
);
495502
} finally {
496503
if (foregroundReplyFence) {

src/auto-reply/reply/reply-dispatcher.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ export type ReplyDispatcherWithTypingOptions = Omit<ReplyDispatcherOptions, "onI
8282
onReplyStart?: () => Promise<void> | void;
8383
onIdle?: () => void;
8484
onSettled?: () => unknown;
85+
onFreshSettledDelivery?: () => unknown;
8586
/** Called when the typing controller is cleaned up (e.g., on NO_REPLY). */
8687
onCleanup?: () => void;
8788
};
@@ -282,7 +283,15 @@ export async function waitForReplyDispatcherIdle(
282283
export function createReplyDispatcherWithTyping(
283284
options: ReplyDispatcherWithTypingOptions,
284285
): ReplyDispatcherWithTypingResult {
285-
const { typingCallbacks, onReplyStart, onIdle, onCleanup, ...dispatcherOptions } = options;
286+
const {
287+
typingCallbacks,
288+
onReplyStart,
289+
onIdle,
290+
onSettled: _onSettled,
291+
onFreshSettledDelivery: _onFreshSettledDelivery,
292+
onCleanup,
293+
...dispatcherOptions
294+
} = options;
286295
const resolvedOnReplyStart = onReplyStart ?? typingCallbacks?.onReplyStart;
287296
const resolvedOnIdle = onIdle ?? typingCallbacks?.onIdle;
288297
const resolvedOnCleanup = onCleanup ?? typingCallbacks?.onCleanup;

0 commit comments

Comments
 (0)