Skip to content

Commit d4e52f4

Browse files
authored
fix(tui): resync streaming watchdog after reconnect (#74224)
* fix(tui): resync streaming watchdog after reconnect * fix(tui): keep reconnect history fallback armed * fix(tui): tighten reconnect watchdog recovery
1 parent d2db67e commit d4e52f4

4 files changed

Lines changed: 206 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai
2323
- TUI/status: clear stale `streaming` footer state when a final event arrives after the active run was already cleared and no tracked runs remain, while preserving concurrent-run ownership and inactive local `/btw` terminal handling. Fixes #64825; carries forward #64842, #64843, #64847, and #64862. Thanks @briandevans and @Yanhu007.
2424
- Channels/Discord: fail startup closed when Discord cannot resolve the bot's own identity and keep mention gating active when only configured mention patterns can detect mentions, so the provider no longer continues with a missing bot id. Fixes #42219; carries forward #46856 and #49218. Thanks @education-01 and @BenediktSchackenberg.
2525
- Channels/Discord: split long CJK replies at punctuation and code-point-safe fallback boundaries so Discord chunking stays readable without corrupting astral characters. Fixes #38597; repairs #71384. Thanks @p3nchan.
26+
- TUI: keep the streaming watchdog alive across active tool/lifecycle proof-of-life, pause it during disconnects, and reload history after stale reconnect runs so long-running chats stop flipping to false idle or hanging on stale streaming. Fixes #69081. Thanks @EenvoudJasper.
2627
- Browser/gateway: ignore Playwright dialog-close races from `Page.handleJavaScriptDialog` so browser automation no longer crashes the Gateway when a dialog disappears before Playwright accepts it. (#40067) Thanks @randyjtw.
2728
- Cron/Gateway: defer missed isolated agent-turn catch-up out of the channel startup window, so overdue cron work cannot starve Discord or Telegram while providers connect after a restart. Thanks @vincentkoc.
2829
- Heartbeat/cron: defer heartbeat turns while cron work is active or queued, add opt-in `heartbeat.skipWhenBusy` for subagent/nested lane pressure, and retry busy skips without advancing the schedule so local Ollama hosts do not run heartbeat and cron prompts concurrently. Fixes #50773. Thanks @scottgl9.

src/tui/tui-event-handlers.test.ts

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,149 @@ describe("tui-event-handlers: streaming watchdog", () => {
10041004
handlers.dispose?.();
10051005
});
10061006

1007+
it("rearms the watchdog on active-run tool events even when tool verbosity is off", () => {
1008+
const { state, setActivityStatus, handlers } = createHarness({
1009+
streamingWatchdogMs: 5_000,
1010+
});
1011+
state.sessionInfo.verboseLevel = "off";
1012+
1013+
handlers.handleChatEvent({
1014+
runId: "run-tools",
1015+
sessionKey: state.currentSessionKey,
1016+
state: "delta",
1017+
message: { content: "first" },
1018+
} satisfies ChatEvent);
1019+
1020+
vi.advanceTimersByTime(3_000);
1021+
1022+
handlers.handleAgentEvent({
1023+
runId: "run-tools",
1024+
stream: "tool",
1025+
data: { phase: "start", toolCallId: "tool-1", name: "read" },
1026+
} satisfies AgentEvent);
1027+
1028+
vi.advanceTimersByTime(3_000);
1029+
1030+
expect(setActivityStatus).not.toHaveBeenCalledWith("idle");
1031+
expect(state.activeChatRunId).toBe("run-tools");
1032+
1033+
vi.advanceTimersByTime(2_001);
1034+
1035+
expect(setActivityStatus).toHaveBeenLastCalledWith("idle");
1036+
expect(state.activeChatRunId).toBeNull();
1037+
1038+
handlers.dispose?.();
1039+
});
1040+
1041+
it("pauses the watchdog while disconnected and rearms it on reconnect without clearing the active run", () => {
1042+
const { state, setActivityStatus, loadHistory, handlers } = createHarness({
1043+
streamingWatchdogMs: 5_000,
1044+
});
1045+
1046+
handlers.handleChatEvent({
1047+
runId: "run-reconnect",
1048+
sessionKey: state.currentSessionKey,
1049+
state: "delta",
1050+
message: { content: "hello" },
1051+
} satisfies ChatEvent);
1052+
1053+
handlers.pauseStreamingWatchdog();
1054+
vi.advanceTimersByTime(10_000);
1055+
1056+
expect(state.activeChatRunId).toBe("run-reconnect");
1057+
expect(setActivityStatus).not.toHaveBeenCalledWith("idle");
1058+
1059+
handlers.reconnectStreamingWatchdog();
1060+
1061+
expect(setActivityStatus).toHaveBeenCalledWith("streaming");
1062+
expect(state.activeChatRunId).toBe("run-reconnect");
1063+
1064+
vi.advanceTimersByTime(5_001);
1065+
1066+
expect(setActivityStatus).toHaveBeenLastCalledWith("idle");
1067+
expect(state.activeChatRunId).toBeNull();
1068+
expect(loadHistory).toHaveBeenCalledTimes(1);
1069+
1070+
handlers.dispose?.();
1071+
});
1072+
1073+
it("reloads history only once when reconnect recovery and deferred history refresh overlap", () => {
1074+
const { state, loadHistory, noteLocalRunId, handlers } = createHarness({
1075+
streamingWatchdogMs: 5_000,
1076+
});
1077+
1078+
handlers.handleChatEvent({
1079+
runId: "run-reconnect",
1080+
sessionKey: state.currentSessionKey,
1081+
state: "delta",
1082+
message: { content: "hello" },
1083+
} satisfies ChatEvent);
1084+
1085+
noteLocalRunId("run-local-empty");
1086+
handlers.handleChatEvent({
1087+
runId: "run-local-empty",
1088+
sessionKey: state.currentSessionKey,
1089+
state: "final",
1090+
} satisfies ChatEvent);
1091+
1092+
handlers.pauseStreamingWatchdog();
1093+
handlers.reconnectStreamingWatchdog();
1094+
vi.advanceTimersByTime(5_001);
1095+
1096+
expect(loadHistory).toHaveBeenCalledTimes(1);
1097+
1098+
handlers.dispose?.();
1099+
});
1100+
1101+
it("resets to idle when reconnect drops an active run that is no longer tracked", () => {
1102+
const { state, setActivityStatus, handlers } = createHarness({
1103+
streamingWatchdogMs: 5_000,
1104+
});
1105+
state.activeChatRunId = "run-stale";
1106+
state.activityStatus = "streaming";
1107+
1108+
handlers.reconnectStreamingWatchdog();
1109+
1110+
expect(state.activeChatRunId).toBeNull();
1111+
expect(state.activityStatus).toBe("idle");
1112+
expect(setActivityStatus).toHaveBeenLastCalledWith("idle");
1113+
1114+
handlers.dispose?.();
1115+
});
1116+
1117+
it("keeps reconnect recovery armed when only terminal lifecycle arrives after reconnect", () => {
1118+
const { state, chatLog, setActivityStatus, loadHistory, handlers } = createHarness({
1119+
streamingWatchdogMs: 5_000,
1120+
});
1121+
1122+
handlers.handleChatEvent({
1123+
runId: "run-lifecycle-only",
1124+
sessionKey: state.currentSessionKey,
1125+
state: "delta",
1126+
message: { content: "hello" },
1127+
} satisfies ChatEvent);
1128+
1129+
handlers.pauseStreamingWatchdog();
1130+
handlers.reconnectStreamingWatchdog();
1131+
1132+
handlers.handleAgentEvent({
1133+
runId: "run-lifecycle-only",
1134+
stream: "lifecycle",
1135+
data: { phase: "end" },
1136+
} satisfies AgentEvent);
1137+
1138+
vi.advanceTimersByTime(5_001);
1139+
1140+
expect(setActivityStatus).toHaveBeenLastCalledWith("idle");
1141+
expect(state.activeChatRunId).toBeNull();
1142+
expect(loadHistory).toHaveBeenCalledTimes(1);
1143+
expect(chatLog.addSystem).not.toHaveBeenCalledWith(
1144+
expect.stringContaining("streaming watchdog"),
1145+
);
1146+
1147+
handlers.dispose?.();
1148+
});
1149+
10071150
it("cancels the watchdog when the run finalizes normally", () => {
10081151
const { state, chatLog, setActivityStatus, handlers } = createHarness({
10091152
streamingWatchdogMs: 5_000,

src/tui/tui-event-handlers.ts

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ export function createEventHandlers(context: EventHandlerContext) {
7272
let streamAssembler = new TuiStreamAssembler();
7373
let lastSessionKey = state.currentSessionKey;
7474
let pendingHistoryRefresh = false;
75+
let reconnectPendingRunId: string | null = null;
7576

7677
const streamingWatchdogMs =
7778
typeof context.streamingWatchdogMs === "number" &&
@@ -98,6 +99,10 @@ export function createEventHandlers(context: EventHandlerContext) {
9899
streamingWatchdogRunId = null;
99100
};
100101

102+
const pauseStreamingWatchdog = () => {
103+
clearStreamingWatchdog();
104+
};
105+
101106
const armStreamingWatchdog = (runId: string) => {
102107
if (streamingWatchdogMs <= 0) {
103108
return;
@@ -115,6 +120,13 @@ export function createEventHandlers(context: EventHandlerContext) {
115120
state.activeChatRunId = null;
116121
state.activityStatus = "idle";
117122
setActivityStatus("idle");
123+
if (reconnectPendingRunId === runId) {
124+
reconnectPendingRunId = null;
125+
pendingHistoryRefresh = false;
126+
void loadHistory?.();
127+
tui.requestRender();
128+
return;
129+
}
118130
flushPendingHistoryRefreshIfIdle();
119131
chatLog.addSystem(
120132
`streaming watchdog: no stream updates for ${Math.round(
@@ -162,6 +174,7 @@ export function createEventHandlers(context: EventHandlerContext) {
162174
streamAssembler = new TuiStreamAssembler();
163175
pendingHistoryRefresh = false;
164176
state.pendingOptimisticUserMessage = false;
177+
reconnectPendingRunId = null;
165178
clearLocalRunIds?.();
166179
clearLocalBtwRunIds?.();
167180
btw.clear();
@@ -210,6 +223,27 @@ export function createEventHandlers(context: EventHandlerContext) {
210223
flushPendingHistoryRefreshIfIdle();
211224
};
212225

226+
const reconnectStreamingWatchdog = () => {
227+
clearStreamingWatchdog();
228+
const activeRunId = state.activeChatRunId;
229+
if (!activeRunId) {
230+
reconnectPendingRunId = null;
231+
clearStaleStreamingRunIfNoTrackedRunRemains();
232+
return;
233+
}
234+
if (!sessionRuns.has(activeRunId)) {
235+
reconnectPendingRunId = null;
236+
state.activeChatRunId = null;
237+
state.activityStatus = "idle";
238+
setActivityStatus("idle");
239+
flushPendingHistoryRefreshIfIdle();
240+
return;
241+
}
242+
reconnectPendingRunId = activeRunId;
243+
setActivityStatus("streaming");
244+
armStreamingWatchdog(activeRunId);
245+
};
246+
213247
const finalizeRun = (params: {
214248
runId: string;
215249
wasActiveRun: boolean;
@@ -324,6 +358,9 @@ export function createEventHandlers(context: EventHandlerContext) {
324358
return;
325359
}
326360
}
361+
if (reconnectPendingRunId === evt.runId) {
362+
reconnectPendingRunId = null;
363+
}
327364
noteSessionRun(evt.runId);
328365
if (!state.activeChatRunId && !isLocalBtwRunId?.(evt.runId)) {
329366
state.activeChatRunId = evt.runId;
@@ -435,6 +472,9 @@ export function createEventHandlers(context: EventHandlerContext) {
435472
return;
436473
}
437474
if (evt.stream === "tool") {
475+
if (isActiveRun) {
476+
armStreamingWatchdog(evt.runId);
477+
}
438478
const verbose = state.sessionInfo.verboseLevel ?? "off";
439479
const allowToolEvents = verbose !== "off";
440480
const allowToolOutput = verbose === "full";
@@ -474,6 +514,9 @@ export function createEventHandlers(context: EventHandlerContext) {
474514
return;
475515
}
476516
const phase = typeof evt.data?.phase === "string" ? evt.data.phase : "";
517+
if (phase && phase !== "end" && phase !== "error") {
518+
armStreamingWatchdog(evt.runId);
519+
}
477520
if (phase === "start") {
478521
setActivityStatus("running");
479522
}
@@ -516,5 +559,12 @@ export function createEventHandlers(context: EventHandlerContext) {
516559
clearStreamingWatchdog();
517560
};
518561

519-
return { handleChatEvent, handleAgentEvent, handleBtwEvent, dispose };
562+
return {
563+
handleChatEvent,
564+
handleAgentEvent,
565+
handleBtwEvent,
566+
pauseStreamingWatchdog,
567+
reconnectStreamingWatchdog,
568+
dispose,
569+
};
520570
}

src/tui/tui.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -900,7 +900,13 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
900900
abortActive,
901901
} = sessionActions;
902902

903-
const { handleChatEvent, handleAgentEvent, handleBtwEvent } = createEventHandlers({
903+
const {
904+
handleChatEvent,
905+
handleAgentEvent,
906+
handleBtwEvent,
907+
pauseStreamingWatchdog,
908+
reconnectStreamingWatchdog,
909+
} = createEventHandlers({
904910
chatLog,
905911
btw,
906912
tui,
@@ -1069,6 +1075,9 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
10691075
pairingHintShown = false;
10701076
const reconnected = wasDisconnected;
10711077
wasDisconnected = false;
1078+
if (reconnected) {
1079+
reconnectStreamingWatchdog();
1080+
}
10721081
setConnectionStatus(isLocalMode ? "local ready" : "connected");
10731082
void (async () => {
10741083
await refreshAgents();
@@ -1092,6 +1101,7 @@ export async function runTui(opts: RunTuiOptions): Promise<TuiResult> {
10921101
isConnected = false;
10931102
wasDisconnected = true;
10941103
historyLoaded = false;
1104+
pauseStreamingWatchdog();
10951105
const disconnectState = isLocalMode
10961106
? {
10971107
connectionStatus: `local runtime stopped${reason ? `: ${reason}` : ""}`,

0 commit comments

Comments
 (0)