Skip to content

Commit d04517c

Browse files
Kaspreclaude
andcommitted
fix(delivery): disambiguate hook cancellations from delivery failures (#57766)
Enrich deliverOutboundPayloads return type from OutboundDeliveryResult[] to DeliveryOutcome — carries cancelledCount and allCancelledByHook so callers can distinguish intentional hook suppression from delivery failure. Wrap mid-stream non-bestEffort throws in DeliveryError with sentBeforeError to prevent blind retries that duplicate already-delivered messages. Guard retryTransientDirectCronDelivery against DeliveryError so partial sends are never retried in the cron direct-delivery path. DeliveryError is detected via a local isDeliveryError(err) helper that checks err.name + Array.isArray(err.sentBeforeError), with DeliveryError imported as a type only. This keeps the heavy outbound delivery module off delivery-dispatch.ts's cold-startup import graph (preserving the existing loadDeliveryOutboundRuntime boundary) and stays defensive against a third-party adapter throwing a same-named error. Updates all 5 callers that use the return value; 9 fire-and-forget callers need no changes. Test mocks updated to return DeliveryOutcome shape. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7c8857b commit d04517c

11 files changed

Lines changed: 624 additions & 60 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai
1818
- Plugins/doctor: avoid full-array sorting while selecting ClawHub search/archive results and bounded dreaming doctor entries. Thanks @shakkernerd.
1919
- Agents/compaction: keep contributor diagnostics to a bounded top-three selection without sorting the full history. Thanks @shakkernerd.
2020
- Sessions/UI: avoid full-array sorting while selecting ACPX leases, Google Meet calendar events, and latest chat sessions. Thanks @shakkernerd.
21+
- Outbound delivery: enrich `deliverOutboundPayloads` to return a `DeliveryOutcome` (an `OutboundDeliveryResult[]` augmented with non-enumerable `cancelledCount` and `allCancelledByHook` metadata, fully back-compatible with existing array-shape consumers) and throw `DeliveryError` with `sentBeforeError` when a non-bestEffort send fails after some payloads were already delivered, so callers can distinguish hook-cancelled sends from delivery failures. Cron direct-delivery, gateway restart-sentinel, and the WAL queue cleanup path all short-circuit on `DeliveryError` and ack the queue entry instead of failing it, so a future drain/restart cannot replay the whole batch and duplicate the already-sent prefix. Fixes #57766.
2122
- Telegram: preserve the channel-specific 10-option poll cap in the unified outbound adapter so over-limit polls are rejected before send. (#78762) Thanks @obviyus.
2223
- Slack: route handled top-level channel turns in implicit-conversation channels to thread-scoped sessions when Slack reply threading is enabled, keeping the root turn and later thread replies on one OpenClaw session. (#78522) Thanks @zeroth-blip.
2324
- Telegram: re-probe the primary fetch transport after repeated sticky fallback success so transient IPv4 or pinned-IP fallback promotion can recover without a gateway restart. Fixes #77088. (#77157) Thanks @MkDev11.

extensions/slack/src/outbound-delivery.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,5 +134,6 @@ describe("slack outbound shared hook wiring", () => {
134134
expect(handler).toHaveBeenCalledTimes(1);
135135
expect(sendMessageSlackMock).not.toHaveBeenCalled();
136136
expect(result).toEqual([]);
137+
expect(result).toMatchObject({ cancelledCount: 1, allCancelledByHook: true });
137138
});
138139
});

src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,25 @@ vi.mock("./delivery-subagent-registry.runtime.js", () => ({
3939
countActiveDescendantRuns: countActiveDescendantRunsMock,
4040
}));
4141

42-
vi.mock("../../infra/outbound/deliver.js", () => ({
43-
deliverOutboundPayloads: vi.fn().mockResolvedValue([{ ok: true }]),
44-
}));
42+
// Narrow mock per AGENTS.md line 134 — avoid broad importOriginal() in hot tests.
43+
// The one test that constructs a DeliveryError uses this local stub; production
44+
// code matches via isDeliveryError() on name + shape, not class identity.
45+
vi.mock("../../infra/outbound/deliver.js", () => {
46+
class DeliveryError extends Error {
47+
readonly cause?: unknown;
48+
readonly sentBeforeError: ReadonlyArray<unknown>;
49+
constructor(cause: unknown, sentBeforeError: ReadonlyArray<unknown>) {
50+
super();
51+
this.name = "DeliveryError";
52+
this.cause = cause;
53+
this.sentBeforeError = sentBeforeError;
54+
}
55+
}
56+
return {
57+
deliverOutboundPayloads: vi.fn().mockResolvedValue([{ ok: true }]),
58+
DeliveryError,
59+
};
60+
});
4561

4662
vi.mock("../../infra/outbound/identity.js", () => ({
4763
resolveAgentOutboundIdentity: vi.fn().mockReturnValue({}),
@@ -86,7 +102,7 @@ import { retireSessionMcpRuntime } from "../../agents/pi-bundle-mcp-tools.js";
86102
// Import after mocks
87103
import { countActiveDescendantRuns } from "../../agents/subagent-registry-read.js";
88104
import { callGateway } from "../../gateway/call.runtime.js";
89-
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
105+
import { DeliveryError, deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
90106
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
91107
import { enqueueSystemEvent } from "../../infra/system-events.js";
92108
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
@@ -578,7 +594,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
578594
vi.setSystemTime(new Date("2026-03-18T17:00:00.000Z"));
579595
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
580596
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
581-
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
597+
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true }] as never);
582598

583599
const params = makeBaseParams({ synthesizedText: "Long running report finished." });
584600
params.runStartedAt = Date.now() - (3 * 60 * 60_000 + 1);
@@ -688,7 +704,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
688704
it("text delivery fires exactly once (no double-deliver)", async () => {
689705
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
690706
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
691-
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
707+
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true }] as never);
692708

693709
const params = makeBaseParams({ synthesizedText: "Briefing ready." });
694710
const state = await dispatchCronDelivery(params);
@@ -706,7 +722,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
706722
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
707723
vi.mocked(deliverOutboundPayloads)
708724
.mockRejectedValueOnce(new Error("ECONNRESET while sending"))
709-
.mockResolvedValueOnce([{ ok: true } as never]);
725+
.mockResolvedValueOnce([{ ok: true }] as never);
710726

711727
const params = makeBaseParams({ synthesizedText: "Retry me once." });
712728
const state = await dispatchCronDelivery(params);
@@ -720,7 +736,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
720736
it("keeps direct announce delivery idempotent across replay for the same cron execution", async () => {
721737
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
722738
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
723-
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
739+
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true }] as never);
724740

725741
const params = makeBaseParams({ synthesizedText: "Replay-safe cron update." });
726742
const first = await dispatchCronDelivery(params);
@@ -735,7 +751,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
735751
it("does not collapse distinct recurring runs for the same job", async () => {
736752
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
737753
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
738-
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
754+
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true }] as never);
739755

740756
const first = makeBaseParams({
741757
runStartedAt: 1_000,
@@ -773,7 +789,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
773789
vi.mocked(deliverOutboundPayloads).mockImplementation(async (params) => {
774790
const failedPayload = Array.isArray(params.payloads) ? params.payloads[0] : undefined;
775791
params.onError?.(new Error("payload failed"), failedPayload as never);
776-
return [{ ok: true } as never];
792+
return [{ ok: true }] as never;
777793
});
778794

779795
const params = makeBaseParams({ synthesizedText: "Partial bestEffort replay." }) as Record<
@@ -790,10 +806,29 @@ describe("dispatchCronDelivery — double-announce guard", () => {
790806
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(2);
791807
});
792808

809+
it("caches partial DeliveryError sends to prevent scheduler replay duplication", async () => {
810+
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
811+
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
812+
vi.mocked(deliverOutboundPayloads).mockRejectedValue(
813+
new DeliveryError(new Error("second chunk failed"), [{ messageId: "w1" } as never]),
814+
);
815+
816+
const params = makeBaseParams({ synthesizedText: "Multi-chunk cron announce." });
817+
const first = await dispatchCronDelivery(params);
818+
const second = await dispatchCronDelivery(params);
819+
820+
// First call hits DeliveryError, caches partial delivery AND marks delivered
821+
// so cron state stays consistent with the idempotency cache. Second call sees
822+
// the cache and skips delivery entirely.
823+
expect(first.delivered).toBe(true);
824+
expect(second.delivered).toBe(true);
825+
expect(deliverOutboundPayloads).toHaveBeenCalledTimes(1);
826+
});
827+
793828
it("prunes the completed-delivery cache back to the entry cap", async () => {
794829
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
795830
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
796-
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
831+
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true }] as never);
797832

798833
for (let i = 0; i < 2003; i += 1) {
799834
const params = makeBaseParams({
@@ -878,7 +913,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
878913
it("text delivery always bypasses the write-ahead queue", async () => {
879914
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
880915
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
881-
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
916+
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true }] as never);
882917

883918
const params = makeBaseParams({ synthesizedText: "Daily digest ready." });
884919
const state = await dispatchCronDelivery(params);
@@ -900,7 +935,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
900935
it("structured/thread delivery also bypasses the write-ahead queue", async () => {
901936
vi.mocked(countActiveDescendantRuns).mockReturnValue(0);
902937
vi.mocked(isLikelyInterimCronMessage).mockReturnValue(false);
903-
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true } as never]);
938+
vi.mocked(deliverOutboundPayloads).mockResolvedValue([{ ok: true }] as never);
904939

905940
const params = makeBaseParams({ synthesizedText: "Report attached." });
906941
// Simulate structured content so useDirectDelivery path is taken (no retryTransient)
@@ -920,7 +955,7 @@ describe("dispatchCronDelivery — double-announce guard", () => {
920955
// First call throws a transient error, second call succeeds.
921956
vi.mocked(deliverOutboundPayloads)
922957
.mockRejectedValueOnce(new Error("gateway timeout"))
923-
.mockResolvedValueOnce([{ ok: true } as never]);
958+
.mockResolvedValueOnce([{ ok: true }] as never);
924959

925960
vi.stubEnv("OPENCLAW_TEST_FAST", "1");
926961
try {

src/cron/isolated-agent/delivery-dispatch.ts

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import type { OpenClawConfig } from "../../config/types.openclaw.js";
1616
import type { TtsAutoMode } from "../../config/types.tts.js";
1717
import { sleepWithAbort } from "../../infra/backoff.js";
1818
import { formatErrorMessage } from "../../infra/errors.js";
19-
import type { OutboundDeliveryResult } from "../../infra/outbound/deliver.js";
19+
import type { DeliveryError, OutboundDeliveryResult } from "../../infra/outbound/deliver.js";
2020
import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js";
2121
import { hasReplyPayloadContent } from "../../interactive/payload.js";
2222
import { stringifyRouteThreadId } from "../../plugin-sdk/channel-route.js";
@@ -423,6 +423,19 @@ export function getCompletedDirectCronDeliveriesCountForTests(): number {
423423
return COMPLETED_DIRECT_CRON_DELIVERIES.size;
424424
}
425425

426+
// Match our DeliveryError by name + shape rather than `instanceof`. Keeps the
427+
// DeliveryError import type-only (no eager load of the heavy outbound delivery
428+
// module from this cold path) and falls through to the caller's generic error
429+
// handling if a third-party adapter happens to throw an error with the same
430+
// name but missing `sentBeforeError` (#57766).
431+
function isDeliveryError(err: unknown): err is DeliveryError {
432+
return (
433+
err instanceof Error &&
434+
err.name === "DeliveryError" &&
435+
Array.isArray((err as { sentBeforeError?: unknown }).sentBeforeError)
436+
);
437+
}
438+
426439
function summarizeDirectCronDeliveryError(error: unknown): string {
427440
if (error instanceof Error) {
428441
return error.message || "error";
@@ -469,7 +482,12 @@ async function retryTransientDirectCronDelivery<T>(params: {
469482
return await params.run();
470483
} catch (err) {
471484
const delayMs = retryDelaysMs[retryIndex];
472-
if (delayMs == null || !isTransientDirectCronDeliveryError(err) || params.signal?.aborted) {
485+
if (
486+
delayMs == null ||
487+
!isTransientDirectCronDeliveryError(err) ||
488+
params.signal?.aborted ||
489+
isDeliveryError(err)
490+
) {
473491
throw err;
474492
}
475493
const nextAttempt = retryIndex + 2;
@@ -681,20 +699,31 @@ export async function dispatchCronDelivery(
681699
// See: https://github.com/openclaw/openclaw/issues/40545
682700
skipQueue: true,
683701
});
684-
const deliveryResults = options?.retryTransient
702+
const deliveryOutcome = options?.retryTransient
685703
? await retryTransientDirectCronDelivery({
686704
jobId: params.job.id,
687705
signal: params.abortSignal,
688706
run: runDelivery,
689707
})
690708
: await runDelivery();
691709
// Only mark delivered when ALL payloads succeeded (no partial failure).
692-
delivered = deliveryResults.length > 0 && !hadPartialFailure;
710+
// Hook cancellation is intentional policy, not a failure — treat as delivered
711+
// so the job is cached and not replayed (#57766).
712+
delivered =
713+
(deliveryOutcome.length > 0 || deliveryOutcome.allCancelledByHook === true) &&
714+
!hadPartialFailure;
693715
// Intentionally leave partial success uncached: replay may duplicate the
694716
// successful subset, but caching it here would permanently drop the
695717
// failed payloads by converting the replay into delivered=true.
718+
//
719+
// Do NOT queue cron awareness when every payload was suppressed by hooks:
720+
// the reply text was not actually sent to the target channel, so
721+
// injecting it into the main session as "delivered" content would leak
722+
// blocked (e.g. DLP/policy-suppressed) output into agent awareness (#57766).
723+
const actuallySentContent = deliveryOutcome.length > 0;
696724
if (
697725
delivered &&
726+
actuallySentContent &&
698727
shouldQueueCronAwareness({
699728
job: params.job,
700729
delivery,
@@ -712,10 +741,35 @@ export async function dispatchCronDelivery(
712741
});
713742
}
714743
if (delivered) {
715-
rememberCompletedDirectCronDelivery(deliveryIdempotencyKey, deliveryResults);
744+
rememberCompletedDirectCronDelivery(deliveryIdempotencyKey, deliveryOutcome);
716745
}
717746
return null;
718747
} catch (err) {
748+
if (isDeliveryError(err) && err.sentBeforeError.length > 0) {
749+
// Partial send: cache as delivered to prevent scheduler replay from
750+
// duplicating the already-sent payloads. Accepts truncation over
751+
// duplication — same trade-off bestEffort makes (#57766).
752+
//
753+
// Also mark `delivered = true` so cron state/history stays consistent
754+
// with the idempotency cache — otherwise the run is persisted as
755+
// "not-delivered" while the same key is treated as already delivered
756+
// on replay.
757+
//
758+
// Awareness (queueCronAwarenessSystemEvent) is intentionally NOT called:
759+
// sentBeforeError contains OutboundDeliveryResult entries (messageId,
760+
// channel) that don't map cleanly back to payload text, so recording
761+
// `outputText`/`synthesizedText` or `payloadsForDelivery` would insert
762+
// unsent content into the agent's main-session transcript as if it had
763+
// been delivered. That false-history risk outweighs the loss of partial
764+
// awareness; the idempotency cache above still prevents user-visible
765+
// duplicates on replay.
766+
rememberCompletedDirectCronDelivery(deliveryIdempotencyKey, err.sentBeforeError);
767+
delivered = true;
768+
await logCronDeliveryError(
769+
`[cron:${params.job.id}] partial delivery: ${err.sentBeforeError.length} payload(s) sent before failure: ${err.message}`,
770+
);
771+
return null;
772+
}
719773
if (!params.deliveryBestEffort) {
720774
return params.withRunSession({
721775
status: "error",

src/gateway/server-methods/send.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,22 @@ export const sendHandlers: GatewayRequestHandlers = {
561561
: undefined,
562562
});
563563

564+
if (results.allCancelledByHook) {
565+
// Hook cancellation is intentional policy, not a transport failure — surface
566+
// as a successful no-op so gateway callers can distinguish it from UNAVAILABLE (#57766).
567+
const payload: Record<string, unknown> = {
568+
runId: idem,
569+
channel,
570+
cancelledByHook: true,
571+
};
572+
cacheGatewayDedupeSuccess({ context, dedupeKey, payload });
573+
return {
574+
ok: true,
575+
payload,
576+
meta: { channel, cancelledByHook: true },
577+
};
578+
}
579+
564580
const result = results.at(-1);
565581
if (!result) {
566582
throw new Error("No delivery result");

src/gateway/server-restart-sentinel.test.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,28 @@ describe("scheduleRestartSentinelWake", () => {
389389
expect(mocks.failDelivery).toHaveBeenCalledWith("queue-1", "transport still not ready");
390390
});
391391

392+
it("acks the queued restart notice on DeliveryError partial-send to prevent replay duplication", async () => {
393+
// When the chunked restart message partially lands and a later chunk
394+
// throws DeliveryError, the queue entry must be acked (not failed) so a
395+
// future drain/restart cannot replay the whole notice and duplicate the
396+
// already-delivered prefix (#57766).
397+
const partialError = Object.assign(new Error("second chunk failed"), {
398+
name: "DeliveryError",
399+
sentBeforeError: [{ channel: "whatsapp", messageId: "chunk-1" }],
400+
});
401+
mocks.deliverOutboundPayloads.mockRejectedValueOnce(partialError);
402+
403+
await scheduleRestartSentinelWake({ deps: {} as never });
404+
405+
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledTimes(1);
406+
expect(mocks.ackDelivery).toHaveBeenCalledWith("queue-1");
407+
expect(mocks.failDelivery).not.toHaveBeenCalled();
408+
expect(mocks.logWarn).toHaveBeenCalledWith(
409+
expect.stringContaining("partial-failed after sending 1 chunk"),
410+
expect.objectContaining({ sentBeforeError: 1 }),
411+
);
412+
});
413+
392414
it("still dispatches continuation after restart notice retries are exhausted", async () => {
393415
vi.useFakeTimers();
394416
mocks.deliverOutboundPayloads.mockRejectedValue(new Error("transport still not ready"));

0 commit comments

Comments
 (0)