Skip to content

Commit 4258a33

Browse files
steipeteSmithLabsLLCdocaohieu2808
committed
refactor(agents): unify subagent announce delivery pipeline
Co-authored-by: Smith Labs <SmithLabsLLC@users.noreply.github.com> Co-authored-by: Do Cao Hieu <docaohieu2808@users.noreply.github.com>
1 parent aedf62a commit 4258a33

14 files changed

Lines changed: 623 additions & 132 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai
1212

1313
### Fixes
1414

15+
- Agents/Subagents delivery: refactor subagent completion announce dispatch into an explicit queue/direct/fallback state machine, recover outbound channel-plugin resolution in cold/stale plugin-registry states across announce/message/gateway send paths, finalize cleanup bookkeeping when announce flow rejects, and treat Telegram sends without `message_id` as delivery failures (instead of false-success `"unknown"` IDs). (#26867, #25961, #26803, #25069, #26741) Thanks @SmithLabsLLC and @docaohieu2808.
1516
- Slack/Session threads: prevent oversized parent-session inheritance from silently bricking new thread sessions, surface embedded context-overflow empty-result failures to users, and add configurable `session.parentForkMaxTokens` (default `100000`, `0` disables). (#26912) Thanks @markshields-tl.
1617
- Security/Signal: enforce DM/group authorization before reaction-only notification enqueue so unauthorized senders can no longer inject Signal reaction system events under `dmPolicy`/`groupPolicy`; reaction notifications now require channel access checks first. This ships in the next npm release (`2026.2.25`). Thanks @tdjackey for reporting.
1718
- Security/Discord + Slack reactions: enforce DM policy/allowlist authorization before reaction-event system enqueue in direct messages; Discord reaction handling now also honors DM/group-DM enablement and guild `groupPolicy` channel gating to keep reaction ingress aligned with normal message preflight. This ships in the next npm release (`2026.2.25`). Thanks @tdjackey for reporting.
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
import {
3+
mapQueueOutcomeToDeliveryResult,
4+
runSubagentAnnounceDispatch,
5+
} from "./subagent-announce-dispatch.js";
6+
7+
describe("mapQueueOutcomeToDeliveryResult", () => {
8+
it("maps steered to delivered", () => {
9+
expect(mapQueueOutcomeToDeliveryResult("steered")).toEqual({
10+
delivered: true,
11+
path: "steered",
12+
});
13+
});
14+
15+
it("maps queued to delivered", () => {
16+
expect(mapQueueOutcomeToDeliveryResult("queued")).toEqual({
17+
delivered: true,
18+
path: "queued",
19+
});
20+
});
21+
22+
it("maps none to not-delivered", () => {
23+
expect(mapQueueOutcomeToDeliveryResult("none")).toEqual({
24+
delivered: false,
25+
path: "none",
26+
});
27+
});
28+
});
29+
30+
describe("runSubagentAnnounceDispatch", () => {
31+
it("uses queue-first ordering for non-completion mode", async () => {
32+
const queue = vi.fn(async () => "none" as const);
33+
const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const }));
34+
35+
const result = await runSubagentAnnounceDispatch({
36+
expectsCompletionMessage: false,
37+
queue,
38+
direct,
39+
});
40+
41+
expect(queue).toHaveBeenCalledTimes(1);
42+
expect(direct).toHaveBeenCalledTimes(1);
43+
expect(result.delivered).toBe(true);
44+
expect(result.path).toBe("direct");
45+
expect(result.phases).toEqual([
46+
{ phase: "queue-primary", delivered: false, path: "none", error: undefined },
47+
{ phase: "direct-primary", delivered: true, path: "direct", error: undefined },
48+
]);
49+
});
50+
51+
it("short-circuits direct send when non-completion queue delivers", async () => {
52+
const queue = vi.fn(async () => "queued" as const);
53+
const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const }));
54+
55+
const result = await runSubagentAnnounceDispatch({
56+
expectsCompletionMessage: false,
57+
queue,
58+
direct,
59+
});
60+
61+
expect(queue).toHaveBeenCalledTimes(1);
62+
expect(direct).not.toHaveBeenCalled();
63+
expect(result.path).toBe("queued");
64+
expect(result.phases).toEqual([
65+
{ phase: "queue-primary", delivered: true, path: "queued", error: undefined },
66+
]);
67+
});
68+
69+
it("uses direct-first ordering for completion mode", async () => {
70+
const queue = vi.fn(async () => "queued" as const);
71+
const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const }));
72+
73+
const result = await runSubagentAnnounceDispatch({
74+
expectsCompletionMessage: true,
75+
queue,
76+
direct,
77+
});
78+
79+
expect(direct).toHaveBeenCalledTimes(1);
80+
expect(queue).not.toHaveBeenCalled();
81+
expect(result.path).toBe("direct");
82+
expect(result.phases).toEqual([
83+
{ phase: "direct-primary", delivered: true, path: "direct", error: undefined },
84+
]);
85+
});
86+
87+
it("falls back to queue when completion direct send fails", async () => {
88+
const queue = vi.fn(async () => "steered" as const);
89+
const direct = vi.fn(async () => ({
90+
delivered: false,
91+
path: "direct" as const,
92+
error: "network",
93+
}));
94+
95+
const result = await runSubagentAnnounceDispatch({
96+
expectsCompletionMessage: true,
97+
queue,
98+
direct,
99+
});
100+
101+
expect(direct).toHaveBeenCalledTimes(1);
102+
expect(queue).toHaveBeenCalledTimes(1);
103+
expect(result.path).toBe("steered");
104+
expect(result.phases).toEqual([
105+
{ phase: "direct-primary", delivered: false, path: "direct", error: "network" },
106+
{ phase: "queue-fallback", delivered: true, path: "steered", error: undefined },
107+
]);
108+
});
109+
110+
it("returns direct failure when completion fallback queue cannot deliver", async () => {
111+
const queue = vi.fn(async () => "none" as const);
112+
const direct = vi.fn(async () => ({
113+
delivered: false,
114+
path: "direct" as const,
115+
error: "failed",
116+
}));
117+
118+
const result = await runSubagentAnnounceDispatch({
119+
expectsCompletionMessage: true,
120+
queue,
121+
direct,
122+
});
123+
124+
expect(result).toMatchObject({
125+
delivered: false,
126+
path: "direct",
127+
error: "failed",
128+
});
129+
expect(result.phases).toEqual([
130+
{ phase: "direct-primary", delivered: false, path: "direct", error: "failed" },
131+
{ phase: "queue-fallback", delivered: false, path: "none", error: undefined },
132+
]);
133+
});
134+
135+
it("returns none immediately when signal is already aborted", async () => {
136+
const queue = vi.fn(async () => "none" as const);
137+
const direct = vi.fn(async () => ({ delivered: true, path: "direct" as const }));
138+
const controller = new AbortController();
139+
controller.abort();
140+
141+
const result = await runSubagentAnnounceDispatch({
142+
expectsCompletionMessage: true,
143+
signal: controller.signal,
144+
queue,
145+
direct,
146+
});
147+
148+
expect(queue).not.toHaveBeenCalled();
149+
expect(direct).not.toHaveBeenCalled();
150+
expect(result).toEqual({
151+
delivered: false,
152+
path: "none",
153+
phases: [],
154+
});
155+
});
156+
});
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
export type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none";
2+
3+
export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none";
4+
5+
export type SubagentAnnounceDeliveryResult = {
6+
delivered: boolean;
7+
path: SubagentDeliveryPath;
8+
error?: string;
9+
phases?: SubagentAnnounceDispatchPhaseResult[];
10+
};
11+
12+
export type SubagentAnnounceDispatchPhase = "queue-primary" | "direct-primary" | "queue-fallback";
13+
14+
export type SubagentAnnounceDispatchPhaseResult = {
15+
phase: SubagentAnnounceDispatchPhase;
16+
delivered: boolean;
17+
path: SubagentDeliveryPath;
18+
error?: string;
19+
};
20+
21+
export function mapQueueOutcomeToDeliveryResult(
22+
outcome: SubagentAnnounceQueueOutcome,
23+
): SubagentAnnounceDeliveryResult {
24+
if (outcome === "steered") {
25+
return {
26+
delivered: true,
27+
path: "steered",
28+
};
29+
}
30+
if (outcome === "queued") {
31+
return {
32+
delivered: true,
33+
path: "queued",
34+
};
35+
}
36+
return {
37+
delivered: false,
38+
path: "none",
39+
};
40+
}
41+
42+
export async function runSubagentAnnounceDispatch(params: {
43+
expectsCompletionMessage: boolean;
44+
signal?: AbortSignal;
45+
queue: () => Promise<SubagentAnnounceQueueOutcome>;
46+
direct: () => Promise<SubagentAnnounceDeliveryResult>;
47+
}): Promise<SubagentAnnounceDeliveryResult> {
48+
const phases: SubagentAnnounceDispatchPhaseResult[] = [];
49+
const appendPhase = (
50+
phase: SubagentAnnounceDispatchPhase,
51+
result: SubagentAnnounceDeliveryResult,
52+
) => {
53+
phases.push({
54+
phase,
55+
delivered: result.delivered,
56+
path: result.path,
57+
error: result.error,
58+
});
59+
};
60+
const withPhases = (result: SubagentAnnounceDeliveryResult): SubagentAnnounceDeliveryResult => ({
61+
...result,
62+
phases,
63+
});
64+
65+
if (params.signal?.aborted) {
66+
return withPhases({
67+
delivered: false,
68+
path: "none",
69+
});
70+
}
71+
72+
if (!params.expectsCompletionMessage) {
73+
const primaryQueue = mapQueueOutcomeToDeliveryResult(await params.queue());
74+
appendPhase("queue-primary", primaryQueue);
75+
if (primaryQueue.delivered) {
76+
return withPhases(primaryQueue);
77+
}
78+
79+
const primaryDirect = await params.direct();
80+
appendPhase("direct-primary", primaryDirect);
81+
return withPhases(primaryDirect);
82+
}
83+
84+
const primaryDirect = await params.direct();
85+
appendPhase("direct-primary", primaryDirect);
86+
if (primaryDirect.delivered) {
87+
return withPhases(primaryDirect);
88+
}
89+
90+
if (params.signal?.aborted) {
91+
return withPhases({
92+
delivered: false,
93+
path: "none",
94+
});
95+
}
96+
97+
const fallbackQueue = mapQueueOutcomeToDeliveryResult(await params.queue());
98+
appendPhase("queue-fallback", fallbackQueue);
99+
if (fallbackQueue.delivered) {
100+
return withPhases(fallbackQueue);
101+
}
102+
103+
return withPhases(primaryDirect);
104+
}

0 commit comments

Comments
 (0)