Skip to content

Commit 0f10c47

Browse files
fix(agents/cli): bridge CLI assistant deltas into channel preview (#76869)
1 parent 678323d commit 0f10c47

3 files changed

Lines changed: 153 additions & 1 deletion

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,7 @@ Docs: https://docs.openclaw.ai
627627
- Plugins/update: keep externalized bundled npm bridge updates on the normal plugin security scanner path instead of granting source-linked official trust without artifact provenance. (#76765) Thanks @Lucenx9.
628628
- Agents/reply context: label replied-to messages as the current user message target in model-visible metadata, so short replies are grounded to their explicit reply target instead of nearby chat history. (#76817) Thanks @obviyus.
629629
- Doctor/plugins: install configured missing official plugins such as Discord and Brave during doctor/update repair, auto-enable repaired provider plugins, preserve config when a download fails, and stop auto-enable from inventing plugin entries when no manifest declares a configured channel. Fixes #76872. Thanks @jack-stormentswe.
630+
- Agents/CLI runner: bridge in-flight assistant agent events into the shared `onPartialReply` callback so CLI backends (Anthropic Max plan via `claude-cli`, Codex CLI, etc.) drive the same Telegram and channel preview path the native API path uses, instead of silently delivering only the final assembled message. Fixes #76869. Thanks @jack-stormentswe.
630631

631632
## 2026.5.2
632633

src/auto-reply/reply/agent-runner-execution.test.ts

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,125 @@ describe("runAgentTurnWithFallback", () => {
471471
);
472472
});
473473

474+
it("bridges CLI assistant agent events into onPartialReply for live preview (#76869)", async () => {
475+
state.isCliProviderMock.mockReturnValue(true);
476+
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => ({
477+
result: await params.run("claude-cli", "claude-opus-4-6"),
478+
provider: "claude-cli",
479+
model: "claude-opus-4-6",
480+
attempts: [],
481+
}));
482+
state.runCliAgentMock.mockImplementationOnce(async (params: { runId: string }) => {
483+
const realAgentEvents = await vi.importActual<typeof import("../../infra/agent-events.js")>(
484+
"../../infra/agent-events.js",
485+
);
486+
realAgentEvents.emitAgentEvent({
487+
runId: params.runId,
488+
stream: "assistant",
489+
data: { text: "Hello", delta: "Hello" },
490+
});
491+
realAgentEvents.emitAgentEvent({
492+
runId: params.runId,
493+
stream: "assistant",
494+
data: { text: "Hello world", delta: " world" },
495+
});
496+
return { payloads: [{ text: "Hello world" }], meta: {} };
497+
});
498+
499+
const onPartialReply = vi.fn(async () => undefined);
500+
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
501+
const followupRun = createFollowupRun();
502+
followupRun.run.provider = "claude-cli";
503+
followupRun.run.model = "claude-opus-4-6";
504+
505+
await runAgentTurnWithFallback({
506+
commandBody: "hi",
507+
followupRun,
508+
sessionCtx: {
509+
Provider: "telegram",
510+
MessageSid: "msg",
511+
} as unknown as TemplateContext,
512+
opts: { onPartialReply },
513+
typingSignals: createMockTypingSignaler(),
514+
blockReplyPipeline: null,
515+
blockStreamingEnabled: false,
516+
resolvedBlockStreamingBreak: "message_end",
517+
applyReplyToMode: (payload) => payload,
518+
shouldEmitToolResult: () => true,
519+
shouldEmitToolOutput: () => false,
520+
pendingToolTasks: new Set(),
521+
resetSessionAfterCompactionFailure: async () => false,
522+
resetSessionAfterRoleOrderingConflict: async () => false,
523+
isHeartbeat: false,
524+
sessionKey: "main",
525+
getActiveSessionEntry: () => undefined,
526+
resolvedVerboseLevel: "off",
527+
});
528+
await new Promise((resolve) => setImmediate(resolve));
529+
530+
const partialTexts = onPartialReply.mock.calls.map(
531+
(call) => (call[0] as { text?: string })?.text,
532+
);
533+
expect(partialTexts).toEqual(["Hello", "Hello world"]);
534+
});
535+
536+
it("does not bridge CLI assistant deltas when silentExpected is set (#76869)", async () => {
537+
state.isCliProviderMock.mockReturnValue(true);
538+
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => ({
539+
result: await params.run("claude-cli", "claude-opus-4-6"),
540+
provider: "claude-cli",
541+
model: "claude-opus-4-6",
542+
attempts: [],
543+
}));
544+
state.runCliAgentMock.mockImplementationOnce(async (params: { runId: string }) => {
545+
const realAgentEvents = await vi.importActual<typeof import("../../infra/agent-events.js")>(
546+
"../../infra/agent-events.js",
547+
);
548+
realAgentEvents.emitAgentEvent({
549+
runId: params.runId,
550+
stream: "assistant",
551+
data: { text: "secret heartbeat output", delta: "secret heartbeat output" },
552+
});
553+
realAgentEvents.emitAgentEvent({
554+
runId: params.runId,
555+
stream: "assistant",
556+
data: { text: "NO_REPLY do not preview", delta: " do not preview" },
557+
});
558+
return { payloads: [{ text: "final" }], meta: {} };
559+
});
560+
561+
const onPartialReply = vi.fn(async () => undefined);
562+
const runAgentTurnWithFallback = await getRunAgentTurnWithFallback();
563+
const followupRun = createFollowupRun();
564+
followupRun.run.provider = "claude-cli";
565+
followupRun.run.model = "claude-opus-4-6";
566+
followupRun.run.silentExpected = true;
567+
568+
await runAgentTurnWithFallback({
569+
commandBody: "hi",
570+
followupRun,
571+
sessionCtx: { Provider: "telegram", MessageSid: "msg" } as unknown as TemplateContext,
572+
opts: { onPartialReply },
573+
typingSignals: createMockTypingSignaler(),
574+
blockReplyPipeline: null,
575+
blockStreamingEnabled: false,
576+
resolvedBlockStreamingBreak: "message_end",
577+
applyReplyToMode: (payload) => payload,
578+
shouldEmitToolResult: () => true,
579+
shouldEmitToolOutput: () => false,
580+
pendingToolTasks: new Set(),
581+
resetSessionAfterCompactionFailure: async () => false,
582+
resetSessionAfterRoleOrderingConflict: async () => false,
583+
isHeartbeat: false,
584+
sessionKey: "main",
585+
getActiveSessionEntry: () => undefined,
586+
resolvedVerboseLevel: "off",
587+
});
588+
await new Promise((resolve) => setImmediate(resolve));
589+
590+
expect(onPartialReply).not.toHaveBeenCalled();
591+
});
592+
474593
it("resolves CLI messageProvider from the live session surface when no origin channel is set", async () => {
475594
state.isCliProviderMock.mockReturnValue(true);
476595
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => ({

src/auto-reply/reply/agent-runner-execution.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import {
4141
updateSessionStore,
4242
} from "../../config/sessions.js";
4343
import { logVerbose } from "../../globals.js";
44-
import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
44+
import { emitAgentEvent, onAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
4545
import { formatErrorMessage } from "../../infra/errors.js";
4646
import { CommandLaneClearedError, GatewayDrainingError } from "../../process/command-queue.js";
4747
import { CommandLane } from "../../process/lanes.js";
@@ -1412,6 +1412,35 @@ export async function runAgentTurnWithFallback(params: {
14121412
});
14131413
return (async () => {
14141414
let lifecycleTerminalEmitted = false;
1415+
let lastBridgedAssistantText: string | undefined;
1416+
let assistantBridgeUnsubscribed = false;
1417+
const rawUnsubscribeAssistantBridge = onAgentEvent((evt) => {
1418+
if (evt.runId !== runId || evt.stream !== "assistant") {
1419+
return;
1420+
}
1421+
if (params.followupRun.run.silentExpected) {
1422+
return;
1423+
}
1424+
const text = typeof evt.data.text === "string" ? evt.data.text : undefined;
1425+
if (text === undefined || text === lastBridgedAssistantText) {
1426+
return;
1427+
}
1428+
lastBridgedAssistantText = text;
1429+
void (async () => {
1430+
const textForTyping = await handlePartialForTyping({ text } as ReplyPayload);
1431+
if (textForTyping === undefined || !params.opts?.onPartialReply) {
1432+
return;
1433+
}
1434+
await params.opts.onPartialReply({ text: textForTyping });
1435+
})().catch(() => undefined);
1436+
});
1437+
const unsubscribeAssistantBridge = () => {
1438+
if (assistantBridgeUnsubscribed) {
1439+
return;
1440+
}
1441+
assistantBridgeUnsubscribed = true;
1442+
rawUnsubscribeAssistantBridge();
1443+
};
14151444
try {
14161445
const result = await runCliAgent({
14171446
sessionId: params.followupRun.run.sessionId,
@@ -1458,6 +1487,8 @@ export async function runAgentTurnWithFallback(params: {
14581487
result.meta?.systemPromptReport,
14591488
);
14601489

1490+
unsubscribeAssistantBridge();
1491+
14611492
// CLI backends don't emit streaming assistant events, so we need to
14621493
// emit one with the final text so server-chat can populate its buffer
14631494
// and send the response to TUI/WebSocket clients.
@@ -1506,6 +1537,7 @@ export async function runAgentTurnWithFallback(params: {
15061537
lifecycleTerminalEmitted = true;
15071538
throw err;
15081539
} finally {
1540+
unsubscribeAssistantBridge();
15091541
// Defensive backstop: never let a CLI run complete without a terminal
15101542
// lifecycle event, otherwise downstream consumers can hang.
15111543
if (!lifecycleTerminalEmitted) {

0 commit comments

Comments
 (0)