Skip to content

Commit c6ddb1a

Browse files
authored
fix: preserve media completion message-tool delivery (#82206)
* fix: preserve message-tool media completion delivery * chore: update generated protocol models
1 parent 29b5563 commit c6ddb1a

13 files changed

Lines changed: 165 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
1111

1212
### Fixes
1313

14+
- Agents/media: preserve message-tool-only delivery for generated music and video completion handoffs, so group/channel completions do not finish without posting the generated attachment.
1415
- LINE: acknowledge signed webhook events before agent processing so slow model replies do not cause LINE `request_timeout` delivery failures. Fixes #65375. Thanks @myericho.
1516
- TTS: preserve channel-derived voice-note delivery for `/tts audio` replies even when the provider output is not natively voice-compatible. (#82174) Thanks @xuruiray.
1617
- Codex/Lossless: keep Codex explicit compaction on native app-server threads while allowing Lossless through the context-engine slot; `openclaw doctor --fix` now migrates legacy `compaction.provider: "lossless-claw"` config to `plugins.slots.contextEngine`.

apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,7 @@ public struct AgentParams: Codable, Sendable {
751751
public let internalruntimehandoffid: String?
752752
public let internalevents: [[String: AnyCodable]]?
753753
public let inputprovenance: [String: AnyCodable]?
754+
public let sourcereplydeliverymode: AnyCodable?
754755
public let voicewaketrigger: String?
755756
public let idempotencykey: String
756757
public let label: String?
@@ -788,6 +789,7 @@ public struct AgentParams: Codable, Sendable {
788789
internalruntimehandoffid: String?,
789790
internalevents: [[String: AnyCodable]]?,
790791
inputprovenance: [String: AnyCodable]?,
792+
sourcereplydeliverymode: AnyCodable?,
791793
voicewaketrigger: String?,
792794
idempotencykey: String,
793795
label: String?)
@@ -824,6 +826,7 @@ public struct AgentParams: Codable, Sendable {
824826
self.internalruntimehandoffid = internalruntimehandoffid
825827
self.internalevents = internalevents
826828
self.inputprovenance = inputprovenance
829+
self.sourcereplydeliverymode = sourcereplydeliverymode
827830
self.voicewaketrigger = voicewaketrigger
828831
self.idempotencykey = idempotencykey
829832
self.label = label
@@ -862,6 +865,7 @@ public struct AgentParams: Codable, Sendable {
862865
case internalruntimehandoffid = "internalRuntimeHandoffId"
863866
case internalevents = "internalEvents"
864867
case inputprovenance = "inputProvenance"
868+
case sourcereplydeliverymode = "sourceReplyDeliveryMode"
865869
case voicewaketrigger = "voiceWakeTrigger"
866870
case idempotencykey = "idempotencyKey"
867871
case label

src/agents/command/attempt-execution.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -630,6 +630,7 @@ export function runAgentAttempt(params: {
630630
toolsAllow: params.opts.toolsAllow,
631631
internalEvents: params.opts.internalEvents,
632632
inputProvenance: params.opts.inputProvenance,
633+
sourceReplyDeliveryMode: params.opts.sourceReplyDeliveryMode,
633634
streamParams: params.opts.streamParams,
634635
agentDir: params.agentDir,
635636
allowTransientCooldownProbe: params.allowTransientCooldownProbe,

src/agents/command/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { AgentInternalEvent } from "../../agents/internal-events.js";
22
import type { SpawnedRunMetadata } from "../../agents/spawned-context.js";
33
import type { PromptMode } from "../../agents/system-prompt.types.js";
4+
import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js";
45
import type { ChannelOutboundTargetMode } from "../../channels/plugins/types.public.js";
56
import type { PromptImageOrderEntry } from "../../media/prompt-image-order.js";
67
import type { InputProvenance } from "../../sessions/input-provenance.js";
@@ -105,6 +106,8 @@ export type AgentCommandOpts = {
105106
bootstrapContextRunKind?: "default" | "heartbeat" | "cron";
106107
internalEvents?: AgentInternalEvent[];
107108
inputProvenance?: InputProvenance;
109+
/** Visible source replies must be sent through the message tool when set. */
110+
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
108111
/** Per-call stream param overrides (best-effort). */
109112
streamParams?: AgentStreamParams;
110113
/** Explicit workspace directory override (for subagents to inherit parent workspace). */

src/agents/pi-embedded-runner/run-state.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js";
12
import {
23
getActiveReplyRunCount,
34
listActiveReplyRunSessionIds,
@@ -11,11 +12,13 @@ export type EmbeddedPiQueueHandle = {
1112
isCompacting: () => boolean;
1213
cancel?: (reason?: "user_abort" | "restart" | "superseded") => void;
1314
abort: () => void;
15+
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
1416
};
1517

1618
export type EmbeddedPiQueueMessageOptions = {
1719
steeringMode?: "all";
1820
debounceMs?: number;
21+
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
1922
};
2023

2124
export type ActiveEmbeddedRunSnapshot = {

src/agents/pi-embedded-runner/run/attempt.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2902,6 +2902,7 @@ export async function runEmbeddedAttempt(
29022902
},
29032903
isStreaming: () => activeSession.isStreaming,
29042904
isCompacting: () => subscription.isCompacting(),
2905+
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
29052906
cancel: () => {
29062907
abortRun();
29072908
},

src/agents/pi-embedded-runner/runs.test.ts

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,46 @@ describe("pi-embedded runner run registry", () => {
7373
const queueMessage = vi.fn(async () => {});
7474
setActiveEmbeddedRun("session-steer", {
7575
...createRunHandle(),
76+
sourceReplyDeliveryMode: "message_tool_only",
7677
queueMessage,
7778
});
7879

7980
expect(
8081
queueEmbeddedPiMessageWithOutcome("session-steer", "continue", {
8182
steeringMode: "all",
83+
sourceReplyDeliveryMode: "message_tool_only",
8284
}).queued,
8385
).toBe(true);
8486

85-
expect(queueMessage).toHaveBeenCalledWith("continue", { steeringMode: "all" });
87+
expect(queueMessage).toHaveBeenCalledWith("continue", {
88+
steeringMode: "all",
89+
sourceReplyDeliveryMode: "message_tool_only",
90+
});
91+
});
92+
93+
it("rejects message-tool-only steering for active runs created without that mode", () => {
94+
const queueMessage = vi.fn(async () => {});
95+
setActiveEmbeddedRun("session-automatic-source-reply", {
96+
...createRunHandle(),
97+
queueMessage,
98+
});
99+
100+
const outcome = queueEmbeddedPiMessageWithOutcome(
101+
"session-automatic-source-reply",
102+
"continue",
103+
{
104+
steeringMode: "all",
105+
sourceReplyDeliveryMode: "message_tool_only",
106+
},
107+
);
108+
109+
expect(outcome).toEqual({
110+
queued: false,
111+
sessionId: "session-automatic-source-reply",
112+
reason: "source_reply_delivery_mode_mismatch",
113+
gatewayHealth: "live",
114+
});
115+
expect(queueMessage).not.toHaveBeenCalled();
86116
});
87117

88118
it("defaults active embedded steering to all pending messages", () => {

src/agents/pi-embedded-runner/runs.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export type EmbeddedPiQueueFailureReason =
4444
| "no_active_run"
4545
| "not_streaming"
4646
| "compacting"
47+
| "source_reply_delivery_mode_mismatch"
4748
| "runtime_rejected";
4849

4950
export type EmbeddedPiQueueMessageOutcome =
@@ -140,7 +141,7 @@ export function queueEmbeddedPiMessageWithOutcome(
140141
text: string,
141142
options?: EmbeddedPiQueueMessageOptions,
142143
): EmbeddedPiQueueMessageOutcome {
143-
const prepared = prepareEmbeddedPiQueueMessage(sessionId, text);
144+
const prepared = prepareEmbeddedPiQueueMessage(sessionId, text, options);
144145
if (prepared.kind === "complete") {
145146
return prepared.outcome;
146147
}
@@ -169,7 +170,7 @@ export async function queueEmbeddedPiMessageWithOutcomeAsync(
169170
text: string,
170171
options?: EmbeddedPiQueueMessageOptions,
171172
): Promise<EmbeddedPiQueueMessageOutcome> {
172-
const prepared = prepareEmbeddedPiQueueMessage(sessionId, text);
173+
const prepared = prepareEmbeddedPiQueueMessage(sessionId, text, options);
173174
if (prepared.kind === "complete") {
174175
return prepared.outcome;
175176
}
@@ -192,6 +193,7 @@ export async function queueEmbeddedPiMessageWithOutcomeAsync(
192193
function prepareEmbeddedPiQueueMessage(
193194
sessionId: string,
194195
text: string,
196+
options?: EmbeddedPiQueueMessageOptions,
195197
): PreparedEmbeddedPiQueueMessage {
196198
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
197199
if (!handle) {
@@ -219,6 +221,18 @@ function prepareEmbeddedPiQueueMessage(
219221
diag.debug(`queue message failed: sessionId=${sessionId} reason=compacting`);
220222
return { kind: "complete", outcome: createQueueFailureOutcome(sessionId, "compacting") };
221223
}
224+
if (
225+
options?.sourceReplyDeliveryMode === "message_tool_only" &&
226+
handle.sourceReplyDeliveryMode !== "message_tool_only"
227+
) {
228+
diag.debug(
229+
`queue message failed: sessionId=${sessionId} reason=source_reply_delivery_mode_mismatch`,
230+
);
231+
return {
232+
kind: "complete",
233+
outcome: createQueueFailureOutcome(sessionId, "source_reply_delivery_mode_mismatch"),
234+
};
235+
}
222236
return { kind: "embedded_run", handle };
223237
}
224238

src/agents/subagent-announce-delivery.test.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,6 +1411,80 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
14111411
accountId: "acct-1",
14121412
to: "channel:C123",
14131413
threadId: undefined,
1414+
sourceReplyDeliveryMode: "message_tool_only",
1415+
});
1416+
expect(sendMessage).not.toHaveBeenCalled();
1417+
});
1418+
1419+
it("falls back to a forced message-tool handoff when the active requester run cannot accept one", async () => {
1420+
const callGateway = createGatewayMock({
1421+
result: {
1422+
payloads: [],
1423+
messagingToolSentTargets: [
1424+
{
1425+
tool: "message",
1426+
provider: "slack",
1427+
accountId: "acct-1",
1428+
to: "channel:C123",
1429+
text: "The track is ready.",
1430+
mediaUrls: ["/tmp/generated-night-drive.mp3"],
1431+
},
1432+
],
1433+
},
1434+
});
1435+
const queueEmbeddedPiMessageWithOutcome = vi.fn((sessionId: string) => ({
1436+
queued: false as const,
1437+
sessionId,
1438+
reason: "source_reply_delivery_mode_mismatch" as const,
1439+
gatewayHealth: "live" as const,
1440+
}));
1441+
const sendMessage = createSendMessageMock();
1442+
const result = await deliverSlackChannelAnnouncement({
1443+
callGateway,
1444+
sendMessage,
1445+
sessionId: "requester-session-channel",
1446+
isActive: true,
1447+
expectsCompletionMessage: true,
1448+
directIdempotencyKey: "announce-channel-media-message-tool-active-mismatch",
1449+
sourceTool: "music_generate",
1450+
queueEmbeddedPiMessageWithOutcome,
1451+
internalEvents: [
1452+
{
1453+
type: "task_completion",
1454+
source: "music_generation",
1455+
childSessionKey: "music_generate:task-123",
1456+
childSessionId: "task-123",
1457+
announceType: "music generation task",
1458+
taskLabel: "night-drive synthwave",
1459+
status: "ok",
1460+
statusLabel: "completed successfully",
1461+
result: "Generated 1 track.\nMEDIA:/tmp/generated-night-drive.mp3",
1462+
mediaUrls: ["/tmp/generated-night-drive.mp3"],
1463+
replyInstruction:
1464+
"Tell the user the music is ready. If visible source delivery requires the message tool, send it there with the generated media attached.",
1465+
},
1466+
],
1467+
});
1468+
1469+
expectRecordFields(result, {
1470+
delivered: true,
1471+
path: "direct",
1472+
});
1473+
expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith(
1474+
"requester-session-channel",
1475+
"child done",
1476+
expect.objectContaining({
1477+
steeringMode: "all",
1478+
sourceReplyDeliveryMode: "message_tool_only",
1479+
}),
1480+
);
1481+
expectGatewayAgentParams(callGateway, {
1482+
deliver: false,
1483+
channel: "slack",
1484+
accountId: "acct-1",
1485+
to: "channel:C123",
1486+
threadId: undefined,
1487+
sourceReplyDeliveryMode: "message_tool_only",
14141488
});
14151489
expect(sendMessage).not.toHaveBeenCalled();
14161490
});
@@ -1478,6 +1552,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
14781552
accountId: "acct-1",
14791553
to: origin.to,
14801554
threadId: undefined,
1555+
sourceReplyDeliveryMode: "message_tool_only",
14811556
});
14821557
expect(sendMessage).not.toHaveBeenCalled();
14831558
},

src/agents/subagent-announce-delivery.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,9 @@ async function sendSubagentAnnounceDirectly(params: {
640640
directOrigin: effectiveDirectOrigin,
641641
requesterSessionOrigin,
642642
});
643+
const completionSourceReplyDeliveryMode = requiresMessageToolDelivery
644+
? "message_tool_only"
645+
: undefined;
643646
const shouldDeliverAgentFinal = deliveryTarget.deliver && !requiresMessageToolDelivery;
644647
const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey);
645648
const requesterQueueSettings = resolveQueueSettings({
@@ -658,6 +661,9 @@ async function sendSubagentAnnounceDirectly(params: {
658661
params.triggerMessage,
659662
{
660663
steeringMode: "all",
664+
...(completionSourceReplyDeliveryMode
665+
? { sourceReplyDeliveryMode: completionSourceReplyDeliveryMode }
666+
: {}),
661667
...(requesterQueueSettings.debounceMs !== undefined
662668
? { debounceMs: requesterQueueSettings.debounceMs }
663669
: {}),
@@ -669,7 +675,9 @@ async function sendSubagentAnnounceDirectly(params: {
669675
path: "steered",
670676
};
671677
}
672-
if (requesterActivity.isActive) {
678+
const shouldFallbackToForcedAgentHandoff =
679+
requiresMessageToolDelivery && wakeOutcome.reason === "source_reply_delivery_mode_mismatch";
680+
if (requesterActivity.isActive && !shouldFallbackToForcedAgentHandoff) {
673681
// Active requester sessions should receive completion data through their
674682
// running agent turn. If wake fails, let the dispatch layer steer/retry;
675683
// do not bypass the requester agent with raw child output.
@@ -717,6 +725,9 @@ async function sendSubagentAnnounceDirectly(params: {
717725
sourceChannel: params.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL,
718726
sourceTool: params.sourceTool ?? "subagent_announce",
719727
},
728+
...(completionSourceReplyDeliveryMode
729+
? { sourceReplyDeliveryMode: completionSourceReplyDeliveryMode }
730+
: {}),
720731
idempotencyKey: params.directIdempotencyKey,
721732
};
722733
let directAnnounceResponse: unknown;

0 commit comments

Comments
 (0)