Skip to content

Commit f9eb7d9

Browse files
committed
fix(agents): queue subagent completion announces
1 parent 990f931 commit f9eb7d9

9 files changed

Lines changed: 204 additions & 169 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ Docs: https://docs.openclaw.ai
3939
- Providers: honor env-proxy settings for guarded provider model fetches when no explicit dispatcher policy is configured, preserving explicit transport overrides. Fixes #70453. (#72480) Thanks @mjamiv.
4040
- Feishu: accept and honor `channels.feishu.blockStreaming` at the top level and per account, while keeping the legacy default off so Feishu cards no longer reject documented config or silently drop block replies. Fixes #75555. Thanks @vincentkoc.
4141
- Gateway/update: avoid `launchctl kickstart -k` immediately after fresh macOS update bootstraps, and unlink dangling global plugin-runtime symlinks during packaged postinstall and `doctor --fix` so upgrades no longer SIGTERM the newly booted Gateway or leave bundled plugin imports pointed at pruned `plugin-runtime-deps` trees. Completes #76261 and fixes #76466. (#76929)
42+
- Agents/subagents: route completion announces through the requester session's internal steer/queue path before direct fallback, preventing synchronous announce turns and timeout retries from piling up behind busy requester sessions.
4243
- Google Chat: normalize custom Google auth transport headers before google-auth/gaxios interceptors run, restoring webhook token verification when certificate retrieval expects Fetch `Headers`. Fixes #76742. Thanks @donbowman.
4344
- Doctor/plugins: reset stale `plugins.slots.memory` and `plugins.slots.contextEngine` references during `doctor --fix`, so cleanup of missing plugin config does not leave unrecoverable slot owners behind. Fixes #76550 and #76551. Thanks @vincentkoc.
4445
- Docs/WhatsApp: merge the duplicate top-level `web` objects in the gateway channel config example so copy-pasted WhatsApp config keeps both `web.whatsapp` and reconnect settings. Fixes #76619. Thanks @WadydX.

docs/tools/subagents.md

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@ requester chat when the run finishes.
8080

8181
</Accordion>
8282
<Accordion title="Manual-spawn delivery resilience">
83-
- OpenClaw tries direct `agent` delivery first with a stable idempotency key.
84-
- If direct delivery fails, it falls back to queue routing.
83+
- OpenClaw steers completion events into an active requester run when possible.
84+
- If the requester run cannot be steered but its session exists, OpenClaw queues an internal follow-up independent of the channel queue mode.
85+
- Direct `agent` delivery is only a fallback when queue routing is unavailable, and uses a stable idempotency key.
86+
- If fallback direct `agent` delivery times out, OpenClaw falls back to direct message delivery instead of starting another agent turn, because the timed-out turn may still be running server-side.
8587
- If queue routing is still not available, the announce is retried with a short exponential backoff before final give-up.
8688
- Completion delivery keeps the resolved requester route: thread-bound or conversation-bound completion routes win when available; if the completion origin only provides a channel, OpenClaw fills the missing target/account from the requester session's resolved route (`lastChannel` / `lastTo` / `lastAccountId`) so direct delivery still works.
8789

@@ -379,11 +381,15 @@ Delivery depends on requester depth:
379381
- Nested requester subagent sessions receive an internal follow-up injection (`deliver=false`) so the orchestrator can synthesize child results in-session.
380382
- If a nested requester subagent session is gone, OpenClaw falls back to that session's requester when available.
381383

382-
For top-level requester sessions, completion-mode direct delivery first
383-
resolves any bound conversation/thread route and hook override, then fills
384-
missing channel-target fields from the requester session's stored route.
385-
That keeps completions on the right chat/topic even when the completion
386-
origin only identifies the channel.
384+
For top-level requester sessions, completion-mode delivery first resolves any
385+
bound conversation/thread route and hook override, then steers the requester run
386+
or queues an internal follow-up with that route. This queue path is used even
387+
when the channel queue mode would otherwise run direct, so child completions do
388+
not start synchronous waiter turns behind busy requester sessions. If no
389+
requester session can be queued, direct fallback still fills missing
390+
channel-target fields from the requester session's stored route. That keeps
391+
completions on the right chat/topic even when the completion origin only
392+
identifies the channel.
387393

388394
Child completion aggregation is scoped to the current requester run when
389395
building nested completion findings, preventing stale prior-run child

src/agents/subagent-announce-delivery.test.ts

Lines changed: 80 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ function createSendMessageMock() {
4646

4747
async function deliverSlackThreadAnnouncement(params: {
4848
callGateway: typeof runtimeCallGateway;
49-
isActive: boolean;
50-
sessionId: string;
49+
isActive?: boolean;
50+
sessionId?: string | null;
5151
expectsCompletionMessage: boolean;
5252
directIdempotencyKey: string;
5353
queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean;
@@ -57,8 +57,8 @@ async function deliverSlackThreadAnnouncement(params: {
5757
__testing.setDepsForTest({
5858
callGateway: params.callGateway,
5959
getRequesterSessionActivity: () => ({
60-
sessionId: params.sessionId,
61-
isActive: params.isActive,
60+
sessionId: params.sessionId ?? undefined,
61+
isActive: params.isActive === true,
6262
}),
6363
getRuntimeConfig: () => ({}) as never,
6464
...(params.queueEmbeddedPiMessage
@@ -88,6 +88,7 @@ async function deliverDiscordDirectMessageCompletion(params: {
8888
callGateway: typeof runtimeCallGateway;
8989
sendMessage?: typeof runtimeSendMessage;
9090
internalEvents?: AgentInternalEvent[];
91+
sessionId?: string | null;
9192
}) {
9293
const origin = {
9394
channel: "discord",
@@ -97,7 +98,8 @@ async function deliverDiscordDirectMessageCompletion(params: {
9798
__testing.setDepsForTest({
9899
callGateway: params.callGateway,
99100
getRequesterSessionActivity: () => ({
100-
sessionId: "requester-session-dm",
101+
sessionId:
102+
params.sessionId === null ? undefined : (params.sessionId ?? "requester-session-dm"),
101103
isActive: false,
102104
}),
103105
getRuntimeConfig: () => ({}) as never,
@@ -126,6 +128,7 @@ async function deliverTelegramDirectMessageCompletion(params: {
126128
sendMessage?: typeof runtimeSendMessage;
127129
internalEvents?: AgentInternalEvent[];
128130
isActive?: boolean;
131+
sessionId?: string | null;
129132
queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean;
130133
}) {
131134
const origin = {
@@ -136,7 +139,8 @@ async function deliverTelegramDirectMessageCompletion(params: {
136139
__testing.setDepsForTest({
137140
callGateway: params.callGateway,
138141
getRequesterSessionActivity: () => ({
139-
sessionId: "requester-session-telegram",
142+
sessionId:
143+
params.sessionId === null ? undefined : (params.sessionId ?? "requester-session-telegram"),
140144
isActive: params.isActive === true,
141145
}),
142146
getRuntimeConfig: () => ({}) as never,
@@ -166,7 +170,7 @@ async function deliverTelegramDirectMessageCompletion(params: {
166170
async function deliverSlackChannelAnnouncement(params: {
167171
callGateway: typeof runtimeCallGateway;
168172
isActive: boolean;
169-
sessionId: string;
173+
sessionId?: string;
170174
expectsCompletionMessage: boolean;
171175
directIdempotencyKey: string;
172176
completionDirectOrigin?: {
@@ -505,32 +509,21 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
505509
expect(callGateway).not.toHaveBeenCalled();
506510
});
507511

508-
it("keeps direct external delivery for dormant completion requesters", async () => {
512+
it("queues completion delivery for dormant requesters", async () => {
509513
const callGateway = createGatewayMock();
510-
await deliverSlackThreadAnnouncement({
514+
const result = await deliverSlackThreadAnnouncement({
511515
callGateway,
512516
sessionId: "requester-session-2",
513517
isActive: false,
514518
expectsCompletionMessage: true,
515519
directIdempotencyKey: "announce-1b",
516520
});
517521

518-
expect(callGateway).toHaveBeenCalledWith(
519-
expect.objectContaining({
520-
method: "agent",
521-
params: expect.objectContaining({
522-
deliver: true,
523-
channel: "slack",
524-
accountId: "acct-1",
525-
to: "channel:C123",
526-
threadId: "171.222",
527-
bestEffortDeliver: true,
528-
}),
529-
}),
530-
);
522+
expect(result).toEqual(expect.objectContaining({ delivered: true, path: "queued" }));
523+
expect(callGateway).not.toHaveBeenCalledWith(expect.objectContaining({ expectFinal: true }));
531524
});
532525

533-
it("keeps announce-agent delivery primary for dormant completion events with child output", async () => {
526+
it("queues dormant completion events with child output", async () => {
534527
const callGateway = createGatewayMock({
535528
result: {
536529
payloads: [{ text: "requester voice completion" }],
@@ -563,35 +556,22 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
563556
expect(result).toEqual(
564557
expect.objectContaining({
565558
delivered: true,
566-
path: "direct",
567-
}),
568-
);
569-
expect(callGateway).toHaveBeenCalledWith(
570-
expect.objectContaining({
571-
method: "agent",
572-
params: expect.objectContaining({
573-
deliver: true,
574-
channel: "slack",
575-
accountId: "acct-1",
576-
to: "channel:C123",
577-
threadId: "171.222",
578-
bestEffortDeliver: true,
579-
internalEvents: expect.any(Array),
580-
}),
559+
path: "queued",
581560
}),
582561
);
562+
expect(callGateway).not.toHaveBeenCalledWith(expect.objectContaining({ expectFinal: true }));
583563
expect(sendMessage).not.toHaveBeenCalled();
584564
});
585565

586-
it("uses a direct thread fallback when announce-agent delivery fails", async () => {
566+
it("uses a direct thread fallback when queue routing is unavailable and announce-agent delivery fails", async () => {
587567
const callGateway = vi.fn(async () => {
588568
throw new Error("UNAVAILABLE: gateway lost final output");
589569
}) as unknown as typeof runtimeCallGateway;
590570
const sendMessage = createSendMessageMock();
591571
const result = await deliverSlackThreadAnnouncement({
592572
callGateway,
593573
sendMessage,
594-
sessionId: "requester-session-4",
574+
sessionId: null,
595575
isActive: false,
596576
expectsCompletionMessage: true,
597577
directIdempotencyKey: "announce-thread-fallback-1",
@@ -632,14 +612,15 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
632612
);
633613
});
634614

635-
it("uses direct fallback for Telegram DMs when announce-agent delivery fails", async () => {
615+
it("uses direct fallback for Telegram DMs when queue routing is unavailable and announce-agent delivery fails", async () => {
636616
const callGateway = vi.fn(async () => {
637617
throw new Error("UNAVAILABLE: requester wake failed");
638618
}) as unknown as typeof runtimeCallGateway;
639619
const sendMessage = createSendMessageMock();
640620
const result = await deliverTelegramDirectMessageCompletion({
641621
callGateway,
642622
sendMessage,
623+
sessionId: null,
643624
internalEvents: [
644625
{
645626
type: "task_completion",
@@ -676,7 +657,53 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
676657
);
677658
});
678659

679-
it("uses direct fallback when an active Telegram requester cannot be woken", async () => {
660+
it("uses direct fallback without retrying when queue routing is unavailable and announce-agent delivery times out", async () => {
661+
const callGateway = vi.fn(async () => {
662+
throw new Error("gateway timeout after 120000ms");
663+
}) as unknown as typeof runtimeCallGateway;
664+
const sendMessage = createSendMessageMock();
665+
const result = await deliverTelegramDirectMessageCompletion({
666+
callGateway,
667+
sendMessage,
668+
sessionId: null,
669+
internalEvents: [
670+
{
671+
type: "task_completion",
672+
source: "subagent",
673+
childSessionKey: "agent:worker:subagent:child",
674+
childSessionId: "child-session-id",
675+
announceType: "subagent task",
676+
taskLabel: "telegram completion smoke",
677+
status: "ok",
678+
statusLabel: "completed successfully",
679+
result: "child completion output",
680+
replyInstruction: "Summarize the result.",
681+
},
682+
],
683+
});
684+
685+
expect(result).toEqual(
686+
expect.objectContaining({
687+
delivered: true,
688+
path: "direct-fallback",
689+
}),
690+
);
691+
expect(callGateway).toHaveBeenCalledTimes(1);
692+
expect(sendMessage).toHaveBeenCalledWith(
693+
expect.objectContaining({
694+
channel: "telegram",
695+
accountId: "bot-1",
696+
to: "123456789",
697+
threadId: undefined,
698+
content: "child completion output",
699+
requesterSessionKey: "agent:main:telegram:123456789",
700+
bestEffort: true,
701+
idempotencyKey: "announce-telegram-dm-fallback",
702+
}),
703+
);
704+
});
705+
706+
it("queues completion delivery when an active Telegram requester cannot be woken", async () => {
680707
const callGateway = createGatewayMock();
681708
const sendMessage = createSendMessageMock();
682709
const queueEmbeddedPiMessage = vi.fn(() => false);
@@ -704,7 +731,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
704731
expect(result).toEqual(
705732
expect.objectContaining({
706733
delivered: true,
707-
path: "direct-fallback",
734+
path: "queued",
708735
}),
709736
);
710737
expect(queueEmbeddedPiMessage).toHaveBeenCalledWith(
@@ -716,16 +743,10 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
716743
},
717744
);
718745
expect(callGateway).not.toHaveBeenCalled();
719-
expect(sendMessage).toHaveBeenCalledWith(
720-
expect.objectContaining({
721-
channel: "telegram",
722-
to: "123456789",
723-
content: "child completion output",
724-
}),
725-
);
746+
expect(sendMessage).not.toHaveBeenCalled();
726747
});
727748

728-
it("uses a direct thread fallback when announce-agent returns no visible output", async () => {
749+
it("uses a direct thread fallback when queue routing is unavailable and announce-agent returns no visible output", async () => {
729750
const callGateway = createGatewayMock({
730751
result: {
731752
payloads: [],
@@ -735,7 +756,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
735756
const result = await deliverSlackThreadAnnouncement({
736757
callGateway,
737758
sendMessage,
738-
sessionId: "requester-session-4",
759+
sessionId: undefined,
739760
isActive: false,
740761
expectsCompletionMessage: true,
741762
directIdempotencyKey: "announce-thread-fallback-empty",
@@ -770,7 +791,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
770791
);
771792
});
772793

773-
it("uses direct fallback for completion DMs without a thread id when announce-agent returns no visible output", async () => {
794+
it("uses direct fallback for completion DMs without a thread id when queue routing is unavailable and announce-agent returns no visible output", async () => {
774795
const callGateway = createGatewayMock({
775796
result: {
776797
payloads: [],
@@ -780,6 +801,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
780801
const result = await deliverDiscordDirectMessageCompletion({
781802
callGateway,
782803
sendMessage,
804+
sessionId: null,
783805
internalEvents: [
784806
{
785807
type: "task_completion",
@@ -841,6 +863,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
841863
const result = await deliverDiscordDirectMessageCompletion({
842864
callGateway,
843865
sendMessage,
866+
sessionId: null,
844867
internalEvents: [
845868
{
846869
type: "task_completion",
@@ -868,7 +891,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
868891
expect(sendMessage).not.toHaveBeenCalled();
869892
});
870893

871-
it("uses a direct channel fallback when announce-agent returns no visible output", async () => {
894+
it("uses a direct channel fallback when queue routing is unavailable and announce-agent returns no visible output", async () => {
872895
const callGateway = createGatewayMock({
873896
result: {
874897
payloads: [],
@@ -878,7 +901,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
878901
const result = await deliverSlackChannelAnnouncement({
879902
callGateway,
880903
sendMessage,
881-
sessionId: "requester-session-channel",
904+
sessionId: undefined,
882905
isActive: false,
883906
expectsCompletionMessage: true,
884907
directIdempotencyKey: "announce-channel-fallback-empty",
@@ -919,15 +942,15 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
919942
);
920943
});
921944

922-
it("falls back to the external requester route when completion origin is internal", async () => {
945+
it("falls back to the external requester route when queue routing is unavailable and completion origin is internal", async () => {
923946
const callGateway = createGatewayMock({
924947
result: {
925948
payloads: [{ text: "child completion output" }],
926949
},
927950
});
928951
const result = await deliverSlackChannelAnnouncement({
929952
callGateway,
930-
sessionId: "requester-session-channel",
953+
sessionId: undefined,
931954
isActive: false,
932955
expectsCompletionMessage: true,
933956
directIdempotencyKey: "announce-channel-internal-origin",

0 commit comments

Comments
 (0)