Skip to content

Commit 3cef9a6

Browse files
committed
fix: use in-process subagent announce handoff
1 parent f1381b5 commit 3cef9a6

10 files changed

Lines changed: 280 additions & 55 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
1414
- Require explicit browser device pairing [AI]. (#81289) Thanks @pgondhi987.
1515
- Require Control UI pairing before proxy-scoped access [AI]. (#81288) Thanks @pgondhi987.
1616
- Installer: honor `--version` for git installs and install from the checked-in lockfile, preventing recent dependency pins from tripping pnpm's minimum-release-age gate during tag installs.
17+
- Agents: deliver same-process subagent completion handoffs through the in-process agent dispatcher instead of opening a Gateway RPC loopback.
1718
- Harden trusted-proxy source validation [AI]. (#81290) Thanks @pgondhi987.
1819
- Agents: add permissive item schemas to array tool parameters before provider submission, preventing OpenAI-compatible schema validation from rejecting plugin tools that omit `items`. Fixes #81175. (#81217) Thanks @JARVIS-Glasses.
1920
- Agents: escalate LLM idle watchdog timeouts through profile rotation and configured model fallback instead of leaving agent turns stuck after a silent model stream. Fixes #76877. (#80449) Thanks @jimdawdy-hub.

src/agents/subagent-announce-delivery.runtime.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export {
55
resolveStorePath,
66
} from "../config/sessions.js";
77
export { callGateway } from "../gateway/call.js";
8+
export { dispatchGatewayMethodInProcess } from "../gateway/server-plugins.js";
89
export { resolveQueueSettings } from "../auto-reply/reply/queue.js";
910
export { resolveExternalBestEffortDeliveryTarget } from "../infra/outbound/best-effort-delivery.js";
1011
export { sendMessage } from "../infra/outbound/message.js";

src/agents/subagent-announce-delivery.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
} from "./subagent-announce-delivery.js";
1616
import {
1717
callGateway as runtimeCallGateway,
18+
dispatchGatewayMethodInProcess as runtimeDispatchGatewayMethodInProcess,
1819
sendMessage as runtimeSendMessage,
1920
} from "./subagent-announce-delivery.runtime.js";
2021
import { resolveAnnounceOrigin } from "./subagent-announce-origin.js";
@@ -35,6 +36,10 @@ function createGatewayMock(response: Record<string, unknown> = {}) {
3536
return vi.fn(async () => response) as unknown as typeof runtimeCallGateway;
3637
}
3738

39+
function createInProcessGatewayMock(response: Record<string, unknown> = {}) {
40+
return vi.fn(async () => response) as unknown as typeof runtimeDispatchGatewayMethodInProcess;
41+
}
42+
3843
function createSendMessageMock() {
3944
return vi.fn(async () => ({
4045
channel: "slack",
@@ -109,6 +114,16 @@ function expectGatewayAgentParams(
109114
return expectRecordFields(request.params, expected);
110115
}
111116

117+
function expectInProcessAgentParams(
118+
dispatchGatewayMethodInProcess: typeof runtimeDispatchGatewayMethodInProcess,
119+
expected: Record<string, unknown>,
120+
) {
121+
const method = mockCallArg(dispatchGatewayMethodInProcess, 0, 0);
122+
expect(method).toBe("agent");
123+
const params = mockCallArg(dispatchGatewayMethodInProcess, 0, 1);
124+
return expectRecordFields(params, expected);
125+
}
126+
112127
async function deliverSlackThreadAnnouncement(params: {
113128
callGateway: typeof runtimeCallGateway;
114129
isActive: boolean;
@@ -625,6 +640,57 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
625640
});
626641
});
627642

643+
it("uses in-process agent dispatch for dormant completion requesters", async () => {
644+
const callGateway = createGatewayMock();
645+
const dispatchGatewayMethodInProcess = createInProcessGatewayMock({
646+
result: {
647+
payloads: [{ text: "requester voice completion" }],
648+
},
649+
});
650+
__testing.setDepsForTest({
651+
callGateway,
652+
dispatchGatewayMethodInProcess,
653+
getRequesterSessionActivity: () => ({
654+
sessionId: "requester-session-local",
655+
isActive: false,
656+
}),
657+
getRuntimeConfig: () => ({}) as never,
658+
});
659+
660+
const result = await deliverSubagentAnnouncement({
661+
requesterSessionKey: "agent:main:slack:channel:C123:thread:171.222",
662+
targetRequesterSessionKey: "agent:main:slack:channel:C123:thread:171.222",
663+
triggerMessage: "child done",
664+
steerMessage: "child done",
665+
requesterOrigin: slackThreadOrigin,
666+
requesterSessionOrigin: slackThreadOrigin,
667+
completionDirectOrigin: slackThreadOrigin,
668+
directOrigin: slackThreadOrigin,
669+
requesterIsSubagent: false,
670+
expectsCompletionMessage: true,
671+
bestEffortDeliver: true,
672+
directIdempotencyKey: "announce-local-dispatch",
673+
});
674+
675+
expectRecordFields(result, {
676+
delivered: true,
677+
path: "direct",
678+
});
679+
expect(callGateway).not.toHaveBeenCalled();
680+
expectInProcessAgentParams(dispatchGatewayMethodInProcess, {
681+
deliver: true,
682+
channel: "slack",
683+
accountId: "acct-1",
684+
to: "channel:C123",
685+
threadId: "171.222",
686+
bestEffortDeliver: true,
687+
});
688+
expect(mockCallArg(dispatchGatewayMethodInProcess, 0, 2)).toMatchObject({
689+
expectFinal: true,
690+
timeoutMs: 120_000,
691+
});
692+
});
693+
628694
it("keeps announce-agent delivery primary for dormant completion events with child output", async () => {
629695
const callGateway = createGatewayMock({
630696
result: {

src/agents/subagent-announce-delivery.ts

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import type { EmbeddedPiQueueMessageOutcome } from "./pi-embedded-runner/runs.js
3131
import {
3232
callGateway,
3333
createBoundDeliveryRouter,
34+
dispatchGatewayMethodInProcess,
3435
getGlobalHookRunner,
3536
isEmbeddedPiRunActive,
3637
getRuntimeConfig,
@@ -58,7 +59,7 @@ const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
5859
const AGENT_MEDIATED_COMPLETION_TOOLS = new Set(["music_generate", "video_generate"]);
5960

6061
type SubagentAnnounceDeliveryDeps = {
61-
callGateway: typeof callGateway;
62+
dispatchGatewayMethodInProcess: typeof dispatchGatewayMethodInProcess;
6263
getRuntimeConfig: typeof getRuntimeConfig;
6364
getRequesterSessionActivity: (requesterSessionKey: string) => {
6465
sessionId?: string;
@@ -72,7 +73,7 @@ type SubagentAnnounceDeliveryDeps = {
7273
};
7374

7475
const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
75-
callGateway,
76+
dispatchGatewayMethodInProcess,
7677
getRuntimeConfig,
7778
getRequesterSessionActivity: (requesterSessionKey: string) => {
7879
const sessionId =
@@ -101,6 +102,21 @@ async function resolveQueueEmbeddedPiMessageOutcome(
101102
);
102103
}
103104

105+
async function runAnnounceAgentCall(params: {
106+
agentParams: Record<string, unknown>;
107+
expectFinal?: boolean;
108+
timeoutMs?: number;
109+
}): Promise<unknown> {
110+
return await subagentAnnounceDeliveryDeps.dispatchGatewayMethodInProcess(
111+
"agent",
112+
params.agentParams,
113+
{
114+
expectFinal: params.expectFinal,
115+
timeoutMs: params.timeoutMs,
116+
},
117+
);
118+
}
119+
104120
function formatQueueWakeFailureError(
105121
fallback: string,
106122
outcome: EmbeddedPiQueueMessageOutcome,
@@ -638,6 +654,36 @@ async function sendSubagentAnnounceDirectly(params: {
638654
path: "none",
639655
};
640656
}
657+
const directAgentParams: Record<string, unknown> = {
658+
sessionKey: canonicalRequesterSessionKey,
659+
message: params.triggerMessage,
660+
deliver: shouldDeliverAgentFinal,
661+
bestEffortDeliver: params.bestEffortDeliver,
662+
internalEvents: params.internalEvents,
663+
channel: shouldDeliverAgentFinal ? deliveryTarget.channel : sessionOnlyOriginChannel,
664+
accountId: shouldDeliverAgentFinal
665+
? deliveryTarget.accountId
666+
: sessionOnlyOriginChannel
667+
? sessionOnlyOrigin?.accountId
668+
: undefined,
669+
to: shouldDeliverAgentFinal
670+
? deliveryTarget.to
671+
: sessionOnlyOriginChannel
672+
? sessionOnlyOrigin?.to
673+
: undefined,
674+
threadId: shouldDeliverAgentFinal
675+
? deliveryTarget.threadId
676+
: sessionOnlyOriginChannel
677+
? sessionOnlyOrigin?.threadId
678+
: undefined,
679+
inputProvenance: {
680+
kind: "inter_session",
681+
sourceSessionKey: params.sourceSessionKey,
682+
sourceChannel: params.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL,
683+
sourceTool: params.sourceTool ?? "subagent_announce",
684+
},
685+
idempotencyKey: params.directIdempotencyKey,
686+
};
641687
let directAnnounceResponse: unknown;
642688
try {
643689
directAnnounceResponse = await runAnnounceDeliveryWithRetry({
@@ -646,38 +692,8 @@ async function sendSubagentAnnounceDirectly(params: {
646692
: "direct announce agent call",
647693
signal: params.signal,
648694
run: async () =>
649-
await subagentAnnounceDeliveryDeps.callGateway({
650-
method: "agent",
651-
params: {
652-
sessionKey: canonicalRequesterSessionKey,
653-
message: params.triggerMessage,
654-
deliver: shouldDeliverAgentFinal,
655-
bestEffortDeliver: params.bestEffortDeliver,
656-
internalEvents: params.internalEvents,
657-
channel: shouldDeliverAgentFinal ? deliveryTarget.channel : sessionOnlyOriginChannel,
658-
accountId: shouldDeliverAgentFinal
659-
? deliveryTarget.accountId
660-
: sessionOnlyOriginChannel
661-
? sessionOnlyOrigin?.accountId
662-
: undefined,
663-
to: shouldDeliverAgentFinal
664-
? deliveryTarget.to
665-
: sessionOnlyOriginChannel
666-
? sessionOnlyOrigin?.to
667-
: undefined,
668-
threadId: shouldDeliverAgentFinal
669-
? deliveryTarget.threadId
670-
: sessionOnlyOriginChannel
671-
? sessionOnlyOrigin?.threadId
672-
: undefined,
673-
inputProvenance: {
674-
kind: "inter_session",
675-
sourceSessionKey: params.sourceSessionKey,
676-
sourceChannel: params.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL,
677-
sourceTool: params.sourceTool ?? "subagent_announce",
678-
},
679-
idempotencyKey: params.directIdempotencyKey,
680-
},
695+
await runAnnounceAgentCall({
696+
agentParams: directAgentParams,
681697
expectFinal: true,
682698
timeoutMs: announceTimeoutMs,
683699
}),
@@ -797,11 +813,30 @@ export async function deliverSubagentAnnouncement(params: {
797813
}
798814

799815
export const __testing = {
800-
setDepsForTest(overrides?: Partial<SubagentAnnounceDeliveryDeps>) {
816+
setDepsForTest(
817+
overrides?: Partial<SubagentAnnounceDeliveryDeps> & {
818+
callGateway?: typeof callGateway;
819+
},
820+
) {
821+
const callGatewayOverride = overrides?.callGateway;
822+
const dispatchGatewayMethodInProcessOverride =
823+
overrides?.dispatchGatewayMethodInProcess ??
824+
(callGatewayOverride
825+
? ((async (method, agentParams, options) =>
826+
await callGatewayOverride({
827+
method,
828+
params: agentParams,
829+
expectFinal: options?.expectFinal,
830+
timeoutMs: options?.timeoutMs,
831+
})) satisfies typeof dispatchGatewayMethodInProcess)
832+
: undefined);
801833
subagentAnnounceDeliveryDeps = overrides
802834
? {
803835
...defaultSubagentAnnounceDeliveryDeps,
804836
...overrides,
837+
...(dispatchGatewayMethodInProcessOverride
838+
? { dispatchGatewayMethodInProcess: dispatchGatewayMethodInProcessOverride }
839+
: {}),
805840
}
806841
: defaultSubagentAnnounceDeliveryDeps;
807842
},

src/agents/subagent-announce.runtime.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ export {
55
resolveStorePath,
66
} from "../config/sessions.js";
77
export { callGateway } from "../gateway/call.js";
8+
export { dispatchGatewayMethodInProcess } from "../gateway/server-plugins.js";
89
export { isEmbeddedPiRunActive, waitForEmbeddedPiRunEnd } from "./pi-embedded-runner/runs.js";

src/agents/subagent-announce.test-support.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { OpenClawConfig } from "../config/types.openclaw.js";
22
import type { callGateway } from "../gateway/call.js";
3+
import type { dispatchGatewayMethodInProcess } from "../gateway/server-plugins.js";
34
import type { EmbeddedPiQueueMessageOptions } from "./pi-embedded-runner/run-state.js";
45
import type { EmbeddedPiQueueMessageOutcome } from "./pi-embedded-runner/runs.js";
56

@@ -54,6 +55,17 @@ export function createSubagentAnnounceDeliveryRuntimeMock(options: DeliveryRunti
5455
return {
5556
callGateway: (async <T = Record<string, unknown>>(request: Parameters<typeof callGateway>[0]) =>
5657
(await options.callGateway(request)) as T) as typeof callGateway,
58+
dispatchGatewayMethodInProcess: (async <T = Record<string, unknown>>(
59+
method: string,
60+
params: Record<string, unknown>,
61+
callOptions?: { expectFinal?: boolean; timeoutMs?: number },
62+
) =>
63+
(await options.callGateway({
64+
method,
65+
params,
66+
expectFinal: callOptions?.expectFinal,
67+
timeoutMs: callOptions?.timeoutMs,
68+
})) as T) as typeof dispatchGatewayMethodInProcess,
5769
getRuntimeConfig: options.getRuntimeConfig,
5870
loadSessionStore: options.loadSessionStore,
5971
resolveAgentIdFromSessionKey: options.resolveAgentIdFromSessionKey,

src/agents/subagent-announce.test.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ const { subagentRegistryRuntimeMock } = vi.hoisted(() => ({
4646

4747
vi.mock("./subagent-announce.runtime.js", () => ({
4848
callGateway: (request: unknown) => callGatewayMock(request),
49+
dispatchGatewayMethodInProcess: (
50+
method: string,
51+
params: Record<string, unknown>,
52+
options?: { timeoutMs?: number },
53+
) => callGatewayMock({ method, params, timeoutMs: options?.timeoutMs }),
4954
isEmbeddedPiRunActive: (sessionId: string) => isEmbeddedPiRunActiveMock(sessionId),
5055
getRuntimeConfig: () => mockConfig,
5156
loadSessionStore: (storePath: string) => loadSessionStoreMock(storePath),

src/agents/subagent-announce.timeout.test.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,20 @@ vi.mock("./subagent-announce-delivery.js", () => ({
175175
}));
176176
vi.mock("./subagent-announce.runtime.js", () => ({
177177
callGateway: createGatewayCallModuleMock().callGateway,
178+
dispatchGatewayMethodInProcess: async (
179+
method: string,
180+
params: Record<string, unknown>,
181+
options?: { expectFinal?: boolean; timeoutMs?: number },
182+
) => {
183+
const request = {
184+
method,
185+
params,
186+
expectFinal: options?.expectFinal,
187+
timeoutMs: options?.timeoutMs,
188+
};
189+
gatewayCalls.push(request);
190+
return await callGatewayImpl(request);
191+
},
178192
getRuntimeConfig: () => configOverride,
179193
loadSessionStore: vi.fn(() => sessionStore),
180194
resolveAgentIdFromSessionKey: () => "main",

0 commit comments

Comments
 (0)