Skip to content

Commit 04eac15

Browse files
authored
fix: recover stale subagent completion announces
Recover stale subagent completion delivery by retrying unsupported transcript-wait wakes without transcript waiting and forcing the existing message-tool handoff when the requester run is stale and direct completion is invisible.\n\nAdds regression coverage for the stale wake sequence and records the maintainer changelog entry.\n\nFixes #83699.
1 parent 754b423 commit 04eac15

3 files changed

Lines changed: 217 additions & 21 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Docs: https://docs.openclaw.ai
66

77
### Fixes
88

9+
- Agents/subagents: recover stale completion announces by retrying unsupported transcript-wait wakes without transcript waiting and forcing a message-tool handoff when the requester run is already stale. Fixes #83699. (#83700) Thanks @galiniliev.
910
- Agents/subagents: skip stale embedded-run wake probes for dormant completion requesters, so late subagent completions go straight to requester-agent/direct handoff instead of producing `reason=no_active_run` queue noise. (#82964) Thanks @galiniliev.
1011
- CLI: retry config snapshot reads after a transient failure so one rejected read no longer poisons later commands in the same process. (#83931) Thanks @honor2030.
1112
- WhatsApp: clarify inbound group diagnostics so observed but unregistered groups point to `channels.whatsapp.groups` without changing routing or sender authorization. (#83846) Thanks @neeravmakwana.

src/agents/subagent-announce-delivery.test.ts

Lines changed: 151 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
} from "../infra/outbound/session-binding-service.js";
66
import type { AgentInternalEvent } from "./internal-events.js";
77
import type {
8+
EmbeddedPiQueueFailureReason,
89
EmbeddedPiQueueMessageOptions,
910
EmbeddedPiQueueMessageOutcome,
1011
} from "./pi-embedded-runner/runs.js";
@@ -36,6 +37,17 @@ function createGatewayMock(response: Record<string, unknown> = {}) {
3637
return vi.fn(async () => response) as unknown as typeof runtimeCallGateway;
3738
}
3839

40+
function createGatewaySequenceMock(
41+
responses: Record<string, unknown>[],
42+
): ReturnType<typeof vi.fn> & typeof runtimeCallGateway {
43+
let index = 0;
44+
return vi.fn(async () => {
45+
const response = responses[Math.min(index, responses.length - 1)] ?? {};
46+
index += 1;
47+
return response;
48+
}) as unknown as ReturnType<typeof vi.fn> & typeof runtimeCallGateway;
49+
}
50+
3951
function createInProcessGatewayMock(response: Record<string, unknown> = {}) {
4052
return vi.fn(async () => response) as unknown as typeof runtimeDispatchGatewayMethodInProcess;
4153
}
@@ -78,6 +90,29 @@ function createQueueOutcomeMock(
7890
);
7991
}
8092

93+
function createQueueOutcomeSequenceMock(
94+
queuedOutcomes: (boolean | EmbeddedPiQueueFailureReason)[],
95+
): ReturnType<typeof vi.fn<QueueEmbeddedPiMessageWithOutcome>> {
96+
let index = 0;
97+
return vi.fn((sessionId: string) => {
98+
const outcome = queuedOutcomes[Math.min(index, queuedOutcomes.length - 1)] ?? false;
99+
index += 1;
100+
return outcome === true
101+
? {
102+
queued: true,
103+
sessionId,
104+
target: "embedded_run",
105+
gatewayHealth: "live",
106+
}
107+
: {
108+
queued: false,
109+
sessionId,
110+
reason: typeof outcome === "string" ? outcome : "not_streaming",
111+
gatewayHealth: "live",
112+
};
113+
});
114+
}
115+
81116
const longChildCompletionOutput = [
82117
"34/34 tests pass, clean build. Now docker repro:",
83118
"Root cause: the requester's announce delivery accepted a prefix-only assistant payload as delivered.",
@@ -1170,6 +1205,104 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
11701205
expect(sendMessage).not.toHaveBeenCalled();
11711206
});
11721207

1208+
it("forces message-tool thread completions after transcript-wait wake falls stale", async () => {
1209+
const callGateway = createGatewaySequenceMock([
1210+
{
1211+
result: {
1212+
payloads: [],
1213+
},
1214+
},
1215+
{
1216+
result: {
1217+
payloads: [],
1218+
messagingToolSentTargets: [
1219+
{
1220+
tool: "message",
1221+
provider: "slack",
1222+
accountId: "acct-1",
1223+
to: "channel:C123",
1224+
threadId: "171.222",
1225+
text: "The background task completed.",
1226+
},
1227+
],
1228+
},
1229+
},
1230+
]);
1231+
const sendMessage = createSendMessageMock();
1232+
const queueEmbeddedPiMessageWithOutcome = createQueueOutcomeSequenceMock([
1233+
"transcript_commit_wait_unsupported",
1234+
"no_active_run",
1235+
]);
1236+
const result = await deliverSlackThreadAnnouncement({
1237+
callGateway,
1238+
sendMessage,
1239+
queueEmbeddedPiMessageWithOutcome,
1240+
sessionId: "requester-session-4",
1241+
isActive: true,
1242+
expectsCompletionMessage: true,
1243+
directIdempotencyKey: "announce-thread-fallback-empty",
1244+
internalEvents: [
1245+
{
1246+
type: "task_completion",
1247+
source: "subagent",
1248+
childSessionKey: "agent:worker:subagent:child",
1249+
childSessionId: "child-session-id",
1250+
announceType: "subagent task",
1251+
taskLabel: "thread completion smoke",
1252+
status: "ok",
1253+
statusLabel: "completed successfully",
1254+
result: "child completion output",
1255+
replyInstruction: "Summarize the result.",
1256+
},
1257+
],
1258+
});
1259+
1260+
expectRecordFields(result, {
1261+
delivered: true,
1262+
path: "direct",
1263+
});
1264+
expect(callGateway).toHaveBeenCalledTimes(2);
1265+
expectGatewayAgentParams(callGateway, {
1266+
deliver: true,
1267+
channel: "slack",
1268+
accountId: "acct-1",
1269+
to: "channel:C123",
1270+
threadId: "171.222",
1271+
});
1272+
expectRecordFields(mockCallArg(callGateway, 1).params, {
1273+
deliver: false,
1274+
channel: "slack",
1275+
accountId: "acct-1",
1276+
to: "channel:C123",
1277+
threadId: "171.222",
1278+
sourceReplyDeliveryMode: "message_tool_only",
1279+
idempotencyKey: "announce-thread-fallback-empty:message-tool",
1280+
});
1281+
expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledTimes(2);
1282+
expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenNthCalledWith(
1283+
1,
1284+
"requester-session-4",
1285+
"child done",
1286+
{
1287+
debounceMs: 500,
1288+
deliveryTimeoutMs: 120_000,
1289+
steeringMode: "all",
1290+
waitForTranscriptCommit: true,
1291+
},
1292+
);
1293+
expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenNthCalledWith(
1294+
2,
1295+
"requester-session-4",
1296+
"child done",
1297+
{
1298+
debounceMs: 500,
1299+
deliveryTimeoutMs: 120_000,
1300+
steeringMode: "all",
1301+
},
1302+
);
1303+
expect(sendMessage).not.toHaveBeenCalled();
1304+
});
1305+
11731306
it("keeps concise requester rewrites primary even when child output is long", async () => {
11741307
const callGateway = createGatewayMock({
11751308
result: {
@@ -1434,18 +1567,23 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
14341567
expect(callGateway).toHaveBeenCalledTimes(1);
14351568
});
14361569

1437-
it("reports failure when announce-agent returns no visible output", async () => {
1570+
it("reports message-tool failure when stale thread completion remains invisible", async () => {
14381571
const callGateway = createGatewayMock({
14391572
result: {
14401573
payloads: [],
14411574
},
14421575
});
14431576
const sendMessage = createSendMessageMock();
1577+
const queueEmbeddedPiMessageWithOutcome = createQueueOutcomeSequenceMock([
1578+
"transcript_commit_wait_unsupported",
1579+
"no_active_run",
1580+
]);
14441581
const result = await deliverSlackThreadAnnouncement({
14451582
callGateway,
14461583
sendMessage,
1584+
queueEmbeddedPiMessageWithOutcome,
14471585
sessionId: "requester-session-4",
1448-
isActive: false,
1586+
isActive: true,
14491587
expectsCompletionMessage: true,
14501588
directIdempotencyKey: "announce-thread-fallback-empty",
14511589
internalEvents: [
@@ -1467,9 +1605,9 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
14671605
expectRecordFields(result, {
14681606
delivered: false,
14691607
path: "direct",
1470-
error: "completion agent did not produce a visible reply",
1608+
error: "completion agent did not deliver through the message tool",
14711609
});
1472-
expect(callGateway).toHaveBeenCalledTimes(1);
1610+
expect(callGateway).toHaveBeenCalledTimes(2);
14731611
expect(sendMessage).not.toHaveBeenCalled();
14741612
});
14751613

@@ -2141,18 +2279,23 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
21412279
expect(sendMessage).not.toHaveBeenCalled();
21422280
});
21432281

2144-
it("reports channel completion failure when announce-agent returns no visible output", async () => {
2282+
it("reports channel message-tool failure when stale completion remains invisible", async () => {
21452283
const callGateway = createGatewayMock({
21462284
result: {
21472285
payloads: [],
21482286
},
21492287
});
21502288
const sendMessage = createSendMessageMock();
2289+
const queueEmbeddedPiMessageWithOutcome = createQueueOutcomeSequenceMock([
2290+
"transcript_commit_wait_unsupported",
2291+
"no_active_run",
2292+
]);
21512293
const result = await deliverSlackChannelAnnouncement({
21522294
callGateway,
21532295
sendMessage,
2296+
queueEmbeddedPiMessageWithOutcome,
21542297
sessionId: "requester-session-channel",
2155-
isActive: false,
2298+
isActive: true,
21562299
expectsCompletionMessage: true,
21572300
directIdempotencyKey: "announce-channel-fallback-empty",
21582301
internalEvents: [
@@ -2174,9 +2317,9 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
21742317
expectRecordFields(result, {
21752318
delivered: false,
21762319
path: "direct",
2177-
error: "completion agent did not produce a visible reply",
2320+
error: "completion agent did not deliver through the message tool",
21782321
});
2179-
expect(callGateway).toHaveBeenCalledTimes(1);
2322+
expect(callGateway).toHaveBeenCalledTimes(2);
21802323
expect(sendMessage).not.toHaveBeenCalled();
21812324
});
21822325

src/agents/subagent-announce-delivery.ts

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ import {
2929
hasVisibleAgentPayload,
3030
} from "./pi-embedded-runner/delivery-evidence.js";
3131
import type { EmbeddedPiQueueMessageOptions } from "./pi-embedded-runner/run-state.js";
32-
import type { EmbeddedPiQueueMessageOutcome } from "./pi-embedded-runner/runs.js";
32+
import type {
33+
EmbeddedPiQueueFailureReason,
34+
EmbeddedPiQueueMessageOutcome,
35+
} from "./pi-embedded-runner/runs.js";
3336
import {
3437
callGateway,
3538
createBoundDeliveryRouter,
@@ -680,6 +683,8 @@ async function sendSubagentAnnounceDirectly(params: {
680683
? "message_tool_only"
681684
: undefined;
682685
const shouldDeliverAgentFinal = deliveryTarget.deliver && !requiresMessageToolDelivery;
686+
let completionWakeFailureReason: EmbeddedPiQueueFailureReason | undefined;
687+
let completionWakeRetriedWithoutTranscriptWait = false;
683688
const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey);
684689
const requesterQueueSettings = resolveQueueSettings({
685690
cfg,
@@ -696,21 +701,32 @@ async function sendSubagentAnnounceDirectly(params: {
696701
requesterActivity.sessionId &&
697702
requesterActivity.isActive
698703
) {
699-
const wakeOutcome = await resolveQueueEmbeddedPiMessageOutcome(
704+
const wakeOptions: EmbeddedPiQueueMessageOptions = {
705+
deliveryTimeoutMs: announceTimeoutMs,
706+
steeringMode: "all",
707+
...(completionSourceReplyDeliveryMode
708+
? { sourceReplyDeliveryMode: completionSourceReplyDeliveryMode }
709+
: {}),
710+
...(requesterQueueSettings.debounceMs !== undefined
711+
? { debounceMs: requesterQueueSettings.debounceMs }
712+
: {}),
713+
waitForTranscriptCommit: true,
714+
};
715+
let wakeOutcome = await resolveQueueEmbeddedPiMessageOutcome(
700716
requesterActivity.sessionId,
701717
params.triggerMessage,
702-
{
703-
deliveryTimeoutMs: announceTimeoutMs,
704-
steeringMode: "all",
705-
...(completionSourceReplyDeliveryMode
706-
? { sourceReplyDeliveryMode: completionSourceReplyDeliveryMode }
707-
: {}),
708-
...(requesterQueueSettings.debounceMs !== undefined
709-
? { debounceMs: requesterQueueSettings.debounceMs }
710-
: {}),
711-
waitForTranscriptCommit: true,
712-
},
718+
wakeOptions,
713719
);
720+
if (!wakeOutcome.queued && wakeOutcome.reason === "transcript_commit_wait_unsupported") {
721+
const bestEffortWakeOptions = { ...wakeOptions };
722+
delete bestEffortWakeOptions.waitForTranscriptCommit;
723+
completionWakeRetriedWithoutTranscriptWait = true;
724+
wakeOutcome = await resolveQueueEmbeddedPiMessageOutcome(
725+
requesterActivity.sessionId,
726+
params.triggerMessage,
727+
bestEffortWakeOptions,
728+
);
729+
}
714730
if (wakeOutcome.queued) {
715731
return {
716732
delivered: true,
@@ -719,6 +735,7 @@ async function sendSubagentAnnounceDirectly(params: {
719735
path: "steered",
720736
};
721737
}
738+
completionWakeFailureReason = wakeOutcome.reason;
722739
defaultRuntime.log(
723740
`[warn] Active requester session could not be woken for subagent completion; falling back to requester-agent handoff: ${formatQueueWakeFailureError(
724741
"active requester session could not be woken",
@@ -834,6 +851,41 @@ async function sendSubagentAnnounceDirectly(params: {
834851
shouldDeliverAgentFinal &&
835852
!hasVisibleGatewayAgentPayload(directAnnounceResponse)
836853
) {
854+
if (
855+
completionWakeRetriedWithoutTranscriptWait &&
856+
(completionWakeFailureReason === "no_active_run" ||
857+
completionWakeFailureReason === "transcript_commit_wait_unsupported")
858+
) {
859+
const forcedMessageToolResponse = await runAnnounceDeliveryWithRetry({
860+
operation: "completion message-tool announce agent call",
861+
signal: params.signal,
862+
run: async () =>
863+
await runAnnounceAgentCall({
864+
agentParams: {
865+
...directAgentParams,
866+
deliver: false,
867+
sourceReplyDeliveryMode: "message_tool_only",
868+
idempotencyKey: `${params.directIdempotencyKey}:message-tool`,
869+
},
870+
expectFinal: true,
871+
timeoutMs: announceTimeoutMs,
872+
}),
873+
});
874+
if (
875+
isGatewayAgentRunPending(forcedMessageToolResponse) ||
876+
hasGatewayAgentMessagingToolDelivery(forcedMessageToolResponse)
877+
) {
878+
return {
879+
delivered: true,
880+
path: "direct",
881+
};
882+
}
883+
return {
884+
delivered: false,
885+
path: "direct",
886+
error: "completion agent did not deliver through the message tool",
887+
};
888+
}
837889
return {
838890
delivered: false,
839891
path: "direct",

0 commit comments

Comments
 (0)