Skip to content

Commit 9057559

Browse files
Galin IlievGalin Iliev
authored andcommitted
fix(gateway): back off session tool mirrors under pressure
1 parent f37fbc9 commit 9057559

4 files changed

Lines changed: 173 additions & 1 deletion

File tree

src/gateway/server-chat.agent-events.test.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ describe("agent event handler", () => {
7878
resolveSessionKeyForRun?: (runId: string) => string | undefined;
7979
lifecycleErrorRetryGraceMs?: number;
8080
isChatSendRunActive?: (runId: string) => boolean;
81+
shouldBackoffLowPrioritySessionToolEvents?: () => boolean;
8182
}) {
8283
const nowSpy =
8384
params?.now === undefined ? undefined : vi.spyOn(Date, "now").mockReturnValue(params.now);
@@ -103,6 +104,8 @@ describe("agent event handler", () => {
103104
loadGatewaySessionRowForSnapshot: loadGatewaySessionRow,
104105
lifecycleErrorRetryGraceMs: params?.lifecycleErrorRetryGraceMs,
105106
isChatSendRunActive: params?.isChatSendRunActive,
107+
shouldBackoffLowPrioritySessionToolEvents:
108+
params?.shouldBackoffLowPrioritySessionToolEvents,
106109
});
107110

108111
return {
@@ -1519,6 +1522,96 @@ describe("agent event handler", () => {
15191522
resetAgentRunContextForTest();
15201523
});
15211524

1525+
it("backs off session-scoped tool mirrors during queued gateway pressure", () => {
1526+
const { broadcastToConnIds, sessionEventSubscribers, toolEventRecipients, handler } =
1527+
createHarness({
1528+
resolveSessionKeyForRun: () => "session-pressure",
1529+
shouldBackoffLowPrioritySessionToolEvents: () => true,
1530+
});
1531+
1532+
registerAgentRunContext("run-pressure-tool", {
1533+
sessionKey: "session-pressure",
1534+
verboseLevel: "off",
1535+
});
1536+
toolEventRecipients.add("run-pressure-tool", "conn-run");
1537+
sessionEventSubscribers.subscribe("conn-session");
1538+
1539+
handler({
1540+
runId: "run-pressure-tool",
1541+
seq: 1,
1542+
stream: "tool",
1543+
ts: 1_234,
1544+
data: {
1545+
phase: "start",
1546+
name: "exec",
1547+
toolCallId: "tool-pressure-1",
1548+
args: { command: "echo hi" },
1549+
},
1550+
});
1551+
1552+
expect(broadcastToConnIds).toHaveBeenCalledTimes(1);
1553+
expect(requireMockArg(broadcastToConnIds, 0, 0, "run tool event")).toBe("agent");
1554+
expect(requireMockArg(broadcastToConnIds, 0, 2, "run tool recipients")).toEqual(
1555+
new Set(["conn-run"]),
1556+
);
1557+
});
1558+
1559+
it("keeps terminal session-scoped tool mirrors during queued gateway pressure", () => {
1560+
let backoffActive = false;
1561+
const { broadcastToConnIds, sessionEventSubscribers, handler } = createHarness({
1562+
resolveSessionKeyForRun: () => "session-pressure-terminal",
1563+
shouldBackoffLowPrioritySessionToolEvents: () => backoffActive,
1564+
});
1565+
1566+
registerAgentRunContext("run-pressure-terminal-tool", {
1567+
sessionKey: "session-pressure-terminal",
1568+
verboseLevel: "off",
1569+
});
1570+
sessionEventSubscribers.subscribe("conn-session");
1571+
1572+
handler({
1573+
runId: "run-pressure-terminal-tool",
1574+
seq: 1,
1575+
stream: "tool",
1576+
ts: 1_234,
1577+
data: {
1578+
phase: "start",
1579+
name: "exec",
1580+
toolCallId: "tool-pressure-terminal-1",
1581+
args: { command: "echo hi" },
1582+
},
1583+
});
1584+
1585+
backoffActive = true;
1586+
handler({
1587+
runId: "run-pressure-terminal-tool",
1588+
seq: 2,
1589+
stream: "tool",
1590+
ts: 1_235,
1591+
data: {
1592+
phase: "result",
1593+
name: "exec",
1594+
toolCallId: "tool-pressure-terminal-1",
1595+
result: { content: [{ type: "text", text: "done" }] },
1596+
},
1597+
});
1598+
1599+
expect(broadcastToConnIds).toHaveBeenCalledTimes(2);
1600+
expect(requireMockArg(broadcastToConnIds, 0, 0, "session tool start event")).toBe(
1601+
"session.tool",
1602+
);
1603+
expect(requireMockArg(broadcastToConnIds, 1, 0, "session tool result event")).toBe(
1604+
"session.tool",
1605+
);
1606+
const resultPayload = requireMockPayload(broadcastToConnIds, 1, 1, "session tool result");
1607+
expectRecordFields(requireRecord(resultPayload.data, "session tool result data"), {
1608+
phase: "result",
1609+
name: "exec",
1610+
toolCallId: "tool-pressure-terminal-1",
1611+
result: { content: [{ type: "text", text: "done" }] },
1612+
});
1613+
});
1614+
15221615
it("suppresses heartbeat tool events for Control UI and verbose node subscribers", () => {
15231616
const {
15241617
broadcastToConnIds,

src/gateway/server-chat.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { getRuntimeConfig } from "../config/io.js";
55
import { type AgentEventPayload, getAgentRunContext } from "../infra/agent-events.js";
66
import { detectErrorKind, type ErrorKind } from "../infra/errors.js";
77
import { resolveHeartbeatVisibility } from "../infra/heartbeat-visibility.js";
8+
import { isDiagnosticQueuePressureBackoffActive } from "../logging/diagnostic.js";
89
import { isAcpSessionKey, isSubagentSessionKey } from "../sessions/session-key-utils.js";
910
import { setSafeTimeout } from "../utils/timer-delay.js";
1011
import {
@@ -175,6 +176,12 @@ function readChatErrorKind(value: unknown): ErrorKind | undefined {
175176
: undefined;
176177
}
177178

179+
const TERMINAL_TOOL_EVENT_PHASES = new Set(["end", "error", "result"]);
180+
181+
function isTerminalToolEventPhase(phase: string): boolean {
182+
return TERMINAL_TOOL_EVENT_PHASES.has(phase);
183+
}
184+
178185
type BroadcastDelta = { deltaText: string; replace?: true };
179186

180187
function resolveBroadcastDelta(params: {
@@ -213,6 +220,7 @@ export type AgentEventHandlerOptions = {
213220
loadGatewaySessionRowForSnapshot?: typeof loadGatewaySessionRow;
214221
lifecycleErrorRetryGraceMs?: number;
215222
isChatSendRunActive?: (runId: string) => boolean;
223+
shouldBackoffLowPrioritySessionToolEvents?: () => boolean;
216224
};
217225

218226
export function createAgentEventHandler({
@@ -228,6 +236,7 @@ export function createAgentEventHandler({
228236
loadGatewaySessionRowForSnapshot = loadGatewaySessionRow,
229237
lifecycleErrorRetryGraceMs = AGENT_LIFECYCLE_ERROR_RETRY_GRACE_MS,
230238
isChatSendRunActive = () => false,
239+
shouldBackoffLowPrioritySessionToolEvents = isDiagnosticQueuePressureBackoffActive,
231240
}: AgentEventHandlerOptions) {
232241
type TerminalLifecycleOptions = { skipChatErrorFinal?: boolean };
233242
type PendingTerminalLifecycleError = {
@@ -910,7 +919,14 @@ export function createAgentEventHandler({
910919
// not know the runId in advance, so they cannot register as run-scoped
911920
// tool recipients. Mirror tool lifecycle onto a session-scoped event so
912921
// they can render live pending tool cards without polling history.
913-
if (isControlUiVisible && sessionKey && !suppressHeartbeatToolEvents) {
922+
const shouldMirrorSessionToolEvent =
923+
!shouldBackoffLowPrioritySessionToolEvents() || isTerminalToolEventPhase(toolPhase);
924+
if (
925+
isControlUiVisible &&
926+
sessionKey &&
927+
!suppressHeartbeatToolEvents &&
928+
shouldMirrorSessionToolEvent
929+
) {
914930
const sessionSubscribers = sessionEventSubscribers.getAll();
915931
if (sessionSubscribers.size > 0) {
916932
broadcastToConnIds(

src/logging/diagnostic.test.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ import {
3434
logSessionStateChange,
3535
logMessageQueued,
3636
diagnosticLogger,
37+
isDiagnosticQueuePressureBackoffActive,
3738
markDiagnosticSessionProgress,
3839
resetDiagnosticStateForTest,
3940
resolveStuckSessionAbortMs,
4041
resolveStuckSessionWarnMs,
4142
startDiagnosticHeartbeat,
43+
stopDiagnosticHeartbeat,
4244
} from "./diagnostic.js";
4345

4446
function createEmitMemorySampleMock() {
@@ -1692,6 +1694,35 @@ describe("stuck session diagnostics threshold", () => {
16921694
);
16931695
});
16941696

1697+
it("marks queued gateway pressure for low-priority backoff", () => {
1698+
startDiagnosticHeartbeat(
1699+
{
1700+
diagnostics: {
1701+
enabled: true,
1702+
},
1703+
},
1704+
{
1705+
emitMemorySample: createEmitMemorySampleMock(),
1706+
sampleLiveness: () => ({
1707+
reasons: ["cpu"],
1708+
intervalMs: 30_000,
1709+
eventLoopUtilization: 0.99,
1710+
cpuTotalMs: 30_000,
1711+
cpuCoreRatio: 1,
1712+
}),
1713+
},
1714+
);
1715+
1716+
logSessionStateChange({ sessionId: "s1", sessionKey: "main", state: "processing" });
1717+
logMessageQueued({ sessionId: "s1", sessionKey: "main", source: "discord" });
1718+
vi.advanceTimersByTime(30_000);
1719+
1720+
expect(isDiagnosticQueuePressureBackoffActive(Date.now())).toBe(true);
1721+
stopDiagnosticHeartbeat();
1722+
expect(isDiagnosticQueuePressureBackoffActive(Date.now())).toBe(false);
1723+
expect(isDiagnosticQueuePressureBackoffActive(Date.now() + 60_001)).toBe(false);
1724+
});
1725+
16951726
it("does not let idle liveness samples suppress later active-work warnings", () => {
16961727
const warnSpy = vi.spyOn(diagnosticLogger, "warn").mockImplementation(() => undefined);
16971728

src/logging/diagnostic.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ const DEFAULT_LIVENESS_EVENT_LOOP_DELAY_WARN_MS = 1_000;
8282
const DEFAULT_LIVENESS_EVENT_LOOP_UTILIZATION_WARN = 0.95;
8383
const DEFAULT_LIVENESS_CPU_CORE_RATIO_WARN = 0.9;
8484
const DEFAULT_LIVENESS_WARN_COOLDOWN_MS = 120_000;
85+
const DEFAULT_QUEUE_PRESSURE_BACKOFF_TTL_MS = 60_000;
8586
let commandPollBackoffRuntimePromise: Promise<
8687
typeof import("../agents/command-poll-backoff.runtime.js")
8788
> | null = null;
@@ -1137,6 +1138,32 @@ export function logActiveRuns() {
11371138
}
11381139

11391140
let heartbeatInterval: NodeJS.Timeout | null = null;
1141+
let diagnosticQueuePressureBackoffUntil = 0;
1142+
1143+
export function isDiagnosticQueuePressureBackoffActive(now = Date.now()): boolean {
1144+
if (!heartbeatInterval || !areDiagnosticsEnabledForProcess()) {
1145+
return false;
1146+
}
1147+
return diagnosticQueuePressureBackoffUntil > now;
1148+
}
1149+
1150+
export function resetDiagnosticQueuePressureBackoffForTest(): void {
1151+
diagnosticQueuePressureBackoffUntil = 0;
1152+
}
1153+
1154+
function updateDiagnosticQueuePressureBackoff(
1155+
now: number,
1156+
sample: DiagnosticLivenessSample,
1157+
work: DiagnosticWorkSnapshot,
1158+
): void {
1159+
if (work.queuedCount <= 0 || sample.reasons.length === 0) {
1160+
return;
1161+
}
1162+
diagnosticQueuePressureBackoffUntil = Math.max(
1163+
diagnosticQueuePressureBackoffUntil,
1164+
now + DEFAULT_QUEUE_PRESSURE_BACKOFF_TTL_MS,
1165+
);
1166+
}
11401167

11411168
export function startDiagnosticHeartbeat(
11421169
config?: OpenClawConfig,
@@ -1175,6 +1202,9 @@ export function startDiagnosticHeartbeat(
11751202
livenessSample !== null && shouldEmitDiagnosticLivenessEvent(now);
11761203
const shouldEmitLivenessWarning =
11771204
livenessSample !== null && shouldEmitDiagnosticLivenessWarning(now, work);
1205+
if (livenessSample) {
1206+
updateDiagnosticQueuePressureBackoff(now, livenessSample, work);
1207+
}
11781208
const shouldEmitLivenessReport = shouldEmitLivenessEvent || shouldEmitLivenessWarning;
11791209
const shouldRecordMemorySample =
11801210
shouldEmitLivenessReport || hasRecentDiagnosticActivity(now) || hasOpenDiagnosticWork(work);
@@ -1297,6 +1327,7 @@ export function stopDiagnosticHeartbeat() {
12971327
stopDiagnosticLivenessSampler();
12981328
stopDiagnosticStabilityRecorder();
12991329
uninstallDiagnosticStabilityFatalHook();
1330+
resetDiagnosticQueuePressureBackoffForTest();
13001331
}
13011332

13021333
export function getDiagnosticSessionStateCountForTest(): number {
@@ -1317,4 +1348,5 @@ export function resetDiagnosticStateForTest(): void {
13171348
resetDiagnosticPhasesForTest();
13181349
resetDiagnosticStabilityRecorderForTest();
13191350
resetDiagnosticStabilityBundleForTest();
1351+
resetDiagnosticQueuePressureBackoffForTest();
13201352
}

0 commit comments

Comments
 (0)