Skip to content

Commit 14b5f73

Browse files
committed
fix(agents): avoid duplicate generated media attachments
1 parent 29a3e71 commit 14b5f73

3 files changed

Lines changed: 184 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ Docs: https://docs.openclaw.ai
5757
- Browser: enforce strict SSRF current-URL checks before existing-session screenshots, matching existing-session snapshot handling. Thanks @vincentkoc.
5858
- Active Memory: give timeout partial transcript recovery enough abort-settle headroom so temporary recall summaries are returned before cleanup. Thanks @vincentkoc.
5959
- Gateway/chat: clear the active reply-run guard before draining queued same-session follow-up turns, so sequential `chat.send` calls no longer trip `ReplyRunAlreadyActiveError` every other request. Fixes #77485. Thanks @bws14email.
60+
- Agents/media: avoid sending generated image, video, and music attachments twice when streamed reply text arrives before the final `MEDIA:` directive.
6061
- Doctor/config: restore legacy group chat config migrations for `routing.allowFrom`, `routing.groupChat.*`, and `channels.telegram.requireMention` so upgrades keep WhatsApp, Telegram, and iMessage group mention gates and history settings instead of leaving configs invalid or silently blocked. Thanks @scoootscooob.
6162
- CLI/update: make package-update follow-up processes write completion results and exit explicitly, so Windows packaged upgrades do not hang after the new package finishes post-core plugin work. Thanks @vincentkoc.
6263
- Release validation: skip Slack live QA unless Slack credentials are explicitly configured, so release gates can keep proving non-Slack surfaces while Slack is still local and credential-gated. Thanks @vincentkoc.

src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,83 @@ describe("subscribeEmbeddedPiSession", () => {
424424
);
425425
});
426426

427+
it("does not attach generated image media to an early streamed chunk before explicit MEDIA", async () => {
428+
const onToolResult = vi.fn();
429+
const onBlockReply = vi.fn();
430+
const { emit } = createSubscribedHarness({
431+
runId: "run",
432+
onToolResult,
433+
onBlockReply,
434+
verboseLevel: "full",
435+
blockReplyBreak: "text_end",
436+
blockReplyChunking: { minChars: 5, maxChars: 200, breakPreference: "newline" },
437+
builtinToolNames: new Set(["image_generate"]),
438+
});
439+
440+
emitToolRun({
441+
emit,
442+
toolName: "image_generate",
443+
toolCallId: "tool-1",
444+
isError: false,
445+
result: {
446+
content: [
447+
{
448+
type: "text",
449+
text: "Generated 1 image with google/gemini-3.1-flash-image-preview.\nMEDIA:/tmp/generated.png",
450+
},
451+
],
452+
details: {
453+
media: {
454+
mediaUrls: ["/tmp/generated.png"],
455+
},
456+
},
457+
},
458+
});
459+
460+
await vi.waitFor(() => {
461+
expect(onToolResult).toHaveBeenCalled();
462+
});
463+
464+
emit({ type: "message_start", message: { role: "assistant" } });
465+
emitAssistantTextDelta(emit, "Generated 1 image.\n");
466+
467+
expect(onBlockReply).toHaveBeenCalledWith(
468+
expect.objectContaining({
469+
text: "Generated 1 image.",
470+
}),
471+
);
472+
expect(onBlockReply.mock.calls.some(([payload]) => payload.mediaUrls?.length)).toBe(false);
473+
474+
emitAssistantTextDelta(emit, "MEDIA:/tmp/generated.png");
475+
emit({
476+
type: "message_update",
477+
message: { role: "assistant" },
478+
assistantMessageEvent: {
479+
type: "text_end",
480+
content: "Generated 1 image.\nMEDIA:/tmp/generated.png",
481+
},
482+
});
483+
emit({
484+
type: "message_end",
485+
message: {
486+
role: "assistant",
487+
content: [
488+
{
489+
type: "text",
490+
text: "Generated 1 image.\nMEDIA:/tmp/generated.png",
491+
},
492+
],
493+
},
494+
});
495+
emit({ type: "agent_end" });
496+
await flushBlockReplyCallbacks();
497+
498+
const mediaPayloads = onBlockReply.mock.calls
499+
.map(([payload]) => payload)
500+
.filter((payload) => payload.mediaUrls?.includes("/tmp/generated.png"));
501+
expect(mediaPayloads).toHaveLength(1);
502+
});
503+
427504
it("attaches media from internal completion events even when assistant omits MEDIA lines", async () => {
428505
const onBlockReply = vi.fn();
429506
const { emit } = createSubscribedHarness({
@@ -469,6 +546,104 @@ describe("subscribeEmbeddedPiSession", () => {
469546
);
470547
});
471548

549+
it.each([
550+
{
551+
label: "music",
552+
source: "music_generation" as const,
553+
childSessionKey: "music_generate:task-123",
554+
announceType: "music generation task",
555+
taskLabel: "launch anthem",
556+
result: "Generated 1 track.\nMEDIA:/tmp/launch-anthem.mp3",
557+
mediaUrl: "/tmp/launch-anthem.mp3",
558+
firstChunk: "Generated 1 track.\n",
559+
finalText: "Generated 1 track.\nMEDIA:/tmp/launch-anthem.mp3",
560+
},
561+
{
562+
label: "video",
563+
source: "video_generation" as const,
564+
childSessionKey: "video_generate:task-123",
565+
announceType: "video generation task",
566+
taskLabel: "launch reel",
567+
result: "Generated 1 video.\nMEDIA:/tmp/launch-reel.mp4",
568+
mediaUrl: "/tmp/launch-reel.mp4",
569+
firstChunk: "Generated 1 video.\n",
570+
finalText: "Generated 1 video.\nMEDIA:/tmp/launch-reel.mp4",
571+
},
572+
])(
573+
"does not attach $label internal completion media to an early streamed chunk before explicit MEDIA",
574+
async ({
575+
source,
576+
childSessionKey,
577+
announceType,
578+
taskLabel,
579+
result,
580+
mediaUrl,
581+
firstChunk,
582+
finalText,
583+
}) => {
584+
const onBlockReply = vi.fn();
585+
const { emit } = createSubscribedHarness({
586+
runId: "run",
587+
onBlockReply,
588+
blockReplyBreak: "text_end",
589+
blockReplyChunking: { minChars: 5, maxChars: 200, breakPreference: "newline" },
590+
internalEvents: [
591+
{
592+
type: "task_completion",
593+
source,
594+
childSessionKey,
595+
announceType,
596+
taskLabel,
597+
status: "ok",
598+
statusLabel: "completed successfully",
599+
result,
600+
mediaUrls: [mediaUrl],
601+
replyInstruction: "Reply normally.",
602+
},
603+
],
604+
});
605+
606+
emit({ type: "message_start", message: { role: "assistant" } });
607+
emitAssistantTextDelta(emit, firstChunk);
608+
609+
expect(onBlockReply).toHaveBeenCalledWith(
610+
expect.objectContaining({
611+
text: firstChunk.trim(),
612+
}),
613+
);
614+
expect(onBlockReply.mock.calls.some(([payload]) => payload.mediaUrls?.length)).toBe(false);
615+
616+
emitAssistantTextDelta(emit, `MEDIA:${mediaUrl}`);
617+
emit({
618+
type: "message_update",
619+
message: { role: "assistant" },
620+
assistantMessageEvent: {
621+
type: "text_end",
622+
content: finalText,
623+
},
624+
});
625+
emit({
626+
type: "message_end",
627+
message: {
628+
role: "assistant",
629+
content: [
630+
{
631+
type: "text",
632+
text: finalText,
633+
},
634+
],
635+
},
636+
});
637+
emit({ type: "agent_end" });
638+
await flushBlockReplyCallbacks();
639+
640+
const mediaPayloads = onBlockReply.mock.calls
641+
.map(([payload]) => payload)
642+
.filter((payload) => payload.mediaUrls?.includes(mediaUrl));
643+
expect(mediaPayloads).toHaveLength(1);
644+
},
645+
);
646+
472647
it("keeps orphaned tool media available for non-block final payload assembly", () => {
473648
const { emit, subscription } = createSubscribedSessionHarness({
474649
runId: "run",

src/agents/pi-embedded-subscribe.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,14 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
239239
};
240240
const emitBlockReply = (
241241
payload: BlockReplyPayload,
242-
options?: { assistantMessageIndex?: number },
242+
options?: { assistantMessageIndex?: number; consumePendingToolMedia?: boolean },
243243
) => {
244244
const withAssistantDirectives = consumePendingAssistantReplyDirectivesIntoReply(state, payload);
245-
emitBlockReplySafely(consumePendingToolMediaIntoReply(state, withAssistantDirectives), options);
245+
const withToolMedia =
246+
options?.consumePendingToolMedia === false
247+
? withAssistantDirectives
248+
: consumePendingToolMediaIntoReply(state, withAssistantDirectives);
249+
emitBlockReplySafely(withToolMedia, options);
246250
};
247251

248252
const resetAssistantMessageState = (nextAssistantTextBaseline: number) => {
@@ -761,6 +765,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
761765
},
762766
{
763767
assistantMessageIndex: options?.assistantMessageIndex ?? state.assistantMessageIndex,
768+
consumePendingToolMedia:
769+
options?.final === true || Boolean(mediaUrls?.length || audioAsVoice),
764770
},
765771
);
766772
};

0 commit comments

Comments
 (0)