Skip to content

Commit 820dc38

Browse files
artwalkerjalehman
andauthored
fix(gateway): add TTL cleanup for 3 Maps that grow unbounded causing OOM (#52731)
Merged via squash. Prepared head SHA: 4816a29 Co-authored-by: artwalker <44759507+artwalker@users.noreply.github.com> Co-authored-by: jalehman <550978+jalehman@users.noreply.github.com> Reviewed-by: @jalehman
1 parent 2d846e1 commit 820dc38

6 files changed

Lines changed: 130 additions & 14 deletions

File tree

CHANGELOG.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,22 @@ Docs: https://docs.openclaw.ai
3333
- Gateway/tailscale: start Tailscale exposure and the gateway update check before awaiting channel and plugin sidecar startup so remote operators are not locked out when startup sidecars stall.
3434
- QQBot/streaming: make block streaming configurable per QQ bot account via `streaming.mode` (`"partial"` | `"off"`, default `"partial"`) instead of hardcoding it off, so responses can be delivered incrementally. (#63746)
3535
- Dreaming/gateway: require `operator.admin` for persistent `/dreaming on|off` changes and treat missing gateway client scopes as unprivileged instead of silently allowing config writes. (#63872) Thanks @mbelinky.
36+
- Matrix/multi-account: keep room-level `account` scoping, inherited room overrides, and implicit account selection consistent across top-level default auth, named accounts, and cached-credential env setups. (#58449) thanks @Daanvdplas and @gumadeiras.
37+
- Gateway/pairing: prefer explicit QR bootstrap auth over earlier Tailscale auth classification so iOS `/pair qr` silent bootstrap pairing does not fall through to `pairing required`. (#59232) Thanks @ngutman.
38+
- Config/Discord: coerce safe integer numeric Discord IDs to strings during config validation, keep unsafe or precision-losing numeric snowflakes rejected, and align `openclaw doctor` repair guidance with the same fail-closed behavior. (#45125) Thanks @moliendocode.
39+
- Gateway/sessions: scope bare `sessions.create` aliases like `main` to the requested agent while preserving the canonical `global` and `unknown` sentinel keys. (#58207) thanks @jalehman.
40+
- `/context detail` now compares the tracked prompt estimate with cached context usage and surfaces untracked provider/runtime overhead when present. (#28391) thanks @ImLukeF.
41+
- Gateway/session reset: emit the typed `before_reset` hook for gateway `/new` and `/reset`, preserving reset-hook behavior even when the previous transcript has already been archived. (#53872) thanks @VACInc
42+
- Plugins/commands: pass the active host `sessionKey` into plugin command contexts, and include `sessionId` when it is already available from the active session entry, so bundled and third-party commands can resolve the current conversation reliably. (#59044) Thanks @jalehman.
43+
- Agents/auth: honor `models.providers.*.authHeader` for pi embedded runner model requests by injecting `Authorization: Bearer <apiKey>` when requested. (#54390) Thanks @lndyzwdxhs.
44+
- UI/compaction: keep the compaction indicator in a retry-pending state until the run actually finishes, so the UI does not show `Context compacted` before compaction actually finishes. (#55132) Thanks @mpz4life.
45+
- Cron/tool schemas: keep cron tool schemas strict-model-friendly while still preserving `failureAlert=false`, nullable `agentId`/`sessionKey`, and flattened add/update recovery for the newly exposed cron job fields. (#55043) Thanks @brunolorente.
46+
- BlueBubbles/config: accept `enrichGroupParticipantsFromContacts` in the core strict config schema so gateways no longer fail validation or startup when the BlueBubbles plugin writes that field. (#56889) Thanks @zqchris.
47+
- Agents/failover: classify AbortError and stream-abort messages as timeout so Ollama NDJSON stream aborts stop showing `reason=unknown` in model fallback logs. (#58324) Thanks @yelog
48+
- Exec approvals: route Slack, Discord, and Telegram approvals through the shared channel approval-capability path so native approval auth, delivery, and `/approve` handling stay aligned across channels while preserving Telegram session-key agent filtering. (#58634) thanks @gumadeiras
49+
- Matrix/runtime: resolve the verification/bootstrap runtime from a distinct packaged Matrix entry so global npm installs stop failing on crypto bootstrap with missing-module or recursive runtime alias errors. (#59249) Thanks @gumadeiras.
50+
- Matrix/streaming: preserve ordered block flushes before tool, message, and agent boundaries, add explicit `channels.matrix.blockStreaming` opt-in so Matrix `streaming: "off"` stays final-only by default, and move MiniMax plain-text final handling into the MiniMax provider runtime instead of the shared core heuristic. (#59266) thanks @gumadeiras
51+
- Gateway/agents: fix stale run-context TTL cleanup so the new maintenance sweep compiles and resets orphaned run sequence state correctly. (#52731) thanks @artwalker
3652

3753
## 2026.4.9
3854

src/agents/subagent-registry-run-manager.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,8 @@ export function createSubagentRunManager(params: {
248248
params.runs.set(nextRunId, next);
249249
params.ensureListener();
250250
params.persist();
251-
if (archiveAtMs) {
252-
params.startSweeper();
253-
}
251+
// Always start sweeper — session-mode runs (no archiveAtMs) also need TTL cleanup.
252+
params.startSweeper();
254253
void waitForSubagentCompletion(nextRunId, waitTimeoutMs);
255254
return true;
256255
};
@@ -338,9 +337,8 @@ export function createSubagentRunManager(params: {
338337
}
339338
params.ensureListener();
340339
params.persist();
341-
if (archiveAtMs) {
342-
params.startSweeper();
343-
}
340+
// Always start sweeper — session-mode runs (no archiveAtMs) also need TTL cleanup.
341+
params.startSweeper();
344342
// Wait for subagent completion via gateway RPC (cross-process).
345343
// The in-process lifecycle listener is a fallback for embedded runs.
346344
void waitForSubagentCompletion(registerParams.runId, waitTimeoutMs);

src/agents/subagent-registry.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
108108
* subsequent lifecycle `start` / `end` can cancel premature failure announces.
109109
*/
110110
const LIFECYCLE_ERROR_RETRY_GRACE_MS = 15_000;
111+
/** Absolute TTL for session-mode runs after cleanup completes (no archiveAtMs). */
112+
const SESSION_RUN_TTL_MS = 5 * 60_000; // 5 minutes
113+
/** Absolute TTL for orphaned pendingLifecycleError entries. */
114+
const PENDING_ERROR_TTL_MS = 5 * 60_000; // 5 minutes
111115

112116
function loadSubagentRegistryRuntime() {
113117
subagentRegistryRuntimePromise ??= import("./subagent-registry.runtime.js");
@@ -432,9 +436,8 @@ function restoreSubagentRunsOnce() {
432436
}
433437
// Resume pending work.
434438
ensureListener();
435-
if ([...subagentRuns.values()].some((entry) => entry.archiveAtMs)) {
436-
startSweeper();
437-
}
439+
// Always start sweeper — session-mode runs (no archiveAtMs) also need TTL cleanup.
440+
startSweeper();
438441
for (const runId of subagentRuns.keys()) {
439442
resumeSubagentRun(runId);
440443
}
@@ -479,7 +482,25 @@ async function sweepSubagentRuns() {
479482
const now = Date.now();
480483
let mutated = false;
481484
for (const [runId, entry] of subagentRuns.entries()) {
482-
if (!entry.archiveAtMs || entry.archiveAtMs > now) {
485+
// Session-mode runs have no archiveAtMs — apply absolute TTL after cleanup completes.
486+
// Use cleanupCompletedAt (not endedAt) to avoid interrupting deferred cleanup flows.
487+
if (!entry.archiveAtMs) {
488+
if (typeof entry.cleanupCompletedAt === "number" && now - entry.cleanupCompletedAt > SESSION_RUN_TTL_MS) {
489+
clearPendingLifecycleError(runId);
490+
void notifyContextEngineSubagentEnded({
491+
childSessionKey: entry.childSessionKey,
492+
reason: "swept",
493+
workspaceDir: entry.workspaceDir,
494+
});
495+
subagentRuns.delete(runId);
496+
mutated = true;
497+
if (!entry.retainAttachmentsOnKeep) {
498+
await safeRemoveAttachmentsDir(entry);
499+
}
500+
}
501+
continue;
502+
}
503+
if (entry.archiveAtMs > now) {
483504
continue;
484505
}
485506
clearPendingLifecycleError(runId);
@@ -506,6 +527,13 @@ async function sweepSubagentRuns() {
506527
// ignore
507528
}
508529
}
530+
// Sweep orphaned pendingLifecycleError entries (absolute TTL).
531+
for (const [runId, pending] of pendingLifecycleErrorByRunId.entries()) {
532+
if (now - pending.endedAt > PENDING_ERROR_TTL_MS) {
533+
clearPendingLifecycleError(runId);
534+
}
535+
}
536+
509537
if (mutated) {
510538
persistSubagentRuns();
511539
}

src/gateway/server-maintenance.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { HealthSummary } from "../commands/health.js";
2+
import { sweepStaleRunContexts } from "../infra/agent-events.js";
23
import { cleanOldMedia } from "../media/store.js";
34
import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js";
45
import type { ChatRunEntry } from "./server-chat.js";
@@ -151,6 +152,8 @@ export function startGatewayMaintenanceTimers(params: {
151152
params.chatDeltaSentAt.delete(runId);
152153
params.chatDeltaLastBroadcastLen.delete(runId);
153154
}
155+
// Sweep stale agent run contexts (orphaned when lifecycle end/error is missed).
156+
sweepStaleRunContexts();
154157
}, 60_000);
155158

156159
if (typeof params.mediaCleanupTtlMs !== "number") {

src/infra/agent-events.test.ts

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { beforeEach, describe, expect, test } from "vitest";
1+
import { beforeEach, describe, expect, test, vi } from "vitest";
22
import {
33
clearAgentRunContext,
44
emitAgentEvent,
@@ -7,6 +7,7 @@ import {
77
registerAgentRunContext,
88
resetAgentEventsForTest,
99
resetAgentRunContextForTest,
10+
sweepStaleRunContexts,
1011
} from "./agent-events.js";
1112

1213
type AgentEventsModule = typeof import("./agent-events.js");
@@ -107,7 +108,7 @@ describe("agent-events sequencing", () => {
107108
isHeartbeat: true,
108109
});
109110

110-
expect(getAgentRunContext("run-ctx")).toEqual({
111+
expect(getAgentRunContext("run-ctx")).toMatchObject({
111112
sessionKey: "session-main",
112113
verboseLevel: "full",
113114
isHeartbeat: true,
@@ -186,12 +187,48 @@ describe("agent-events sequencing", () => {
186187

187188
stop();
188189

189-
expect(second.getAgentRunContext("run-dup")).toEqual({ sessionKey: "session-dup" });
190+
expect(second.getAgentRunContext("run-dup")).toMatchObject({ sessionKey: "session-dup" });
190191
expect(seen).toEqual([
191192
{ seq: 1, sessionKey: "session-dup" },
192193
{ seq: 2, sessionKey: "session-dup" },
193194
]);
194195

195196
first.resetAgentEventsForTest();
196197
});
198+
199+
test("sweeps stale run contexts and clears their sequence state", async () => {
200+
const stop = vi.spyOn(Date, "now");
201+
stop.mockReturnValue(100);
202+
registerAgentRunContext("run-stale", { sessionKey: "session-stale", registeredAt: 100 });
203+
registerAgentRunContext("run-active", { sessionKey: "session-active", registeredAt: 100 });
204+
205+
stop.mockReturnValue(200);
206+
emitAgentEvent({ runId: "run-stale", stream: "assistant", data: { text: "stale" } });
207+
208+
stop.mockReturnValue(900);
209+
emitAgentEvent({ runId: "run-active", stream: "assistant", data: { text: "active" } });
210+
211+
stop.mockReturnValue(1_000);
212+
expect(sweepStaleRunContexts(500)).toBe(1);
213+
expect(getAgentRunContext("run-stale")).toBeUndefined();
214+
expect(getAgentRunContext("run-active")).toMatchObject({ sessionKey: "session-active" });
215+
216+
const seen: Array<{ runId: string; seq: number }> = [];
217+
const unsubscribe = onAgentEvent((evt) => {
218+
if (evt.runId === "run-stale" || evt.runId === "run-active") {
219+
seen.push({ runId: evt.runId, seq: evt.seq });
220+
}
221+
});
222+
223+
emitAgentEvent({ runId: "run-stale", stream: "assistant", data: { text: "restarted" } });
224+
emitAgentEvent({ runId: "run-active", stream: "assistant", data: { text: "continued" } });
225+
226+
unsubscribe();
227+
stop.mockRestore();
228+
229+
expect(seen).toEqual([
230+
{ runId: "run-stale", seq: 1 },
231+
{ runId: "run-active", seq: 2 },
232+
]);
233+
});
197234
});

src/infra/agent-events.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ export type AgentRunContext = {
111111
isHeartbeat?: boolean;
112112
/** Whether control UI clients should receive chat/agent updates for this run. */
113113
isControlUiVisible?: boolean;
114+
/** Timestamp when this context was first registered (for TTL-based cleanup). */
115+
registeredAt?: number;
116+
/** Timestamp of last activity (updated on every emitAgentEvent). */
117+
lastActiveAt?: number;
114118
};
115119

116120
type AgentEventState = {
@@ -136,7 +140,10 @@ export function registerAgentRunContext(runId: string, context: AgentRunContext)
136140
const state = getAgentEventState();
137141
const existing = state.runContextById.get(runId);
138142
if (!existing) {
139-
state.runContextById.set(runId, { ...context });
143+
state.runContextById.set(runId, {
144+
...context,
145+
registeredAt: context.registeredAt ?? Date.now(),
146+
});
140147
return;
141148
}
142149
if (context.sessionKey && existing.sessionKey !== context.sessionKey) {
@@ -159,17 +166,44 @@ export function getAgentRunContext(runId: string) {
159166

160167
export function clearAgentRunContext(runId: string) {
161168
getAgentEventState().runContextById.delete(runId);
169+
getAgentEventState().seqByRun.delete(runId);
170+
}
171+
172+
/**
173+
* Sweep stale run contexts that exceeded the given TTL.
174+
* Guards against orphaned entries when lifecycle "end"/"error" events are missed.
175+
*/
176+
export function sweepStaleRunContexts(maxAgeMs = 30 * 60 * 1000): number {
177+
const state = getAgentEventState();
178+
const now = Date.now();
179+
let swept = 0;
180+
for (const [runId, ctx] of state.runContextById.entries()) {
181+
// Use lastActiveAt (refreshed on every event) to avoid sweeping active runs.
182+
// Fall back to registeredAt, then treat missing timestamps as infinitely old.
183+
const lastSeen = ctx.lastActiveAt ?? ctx.registeredAt;
184+
const age = lastSeen ? now - lastSeen : Infinity;
185+
if (age > maxAgeMs) {
186+
state.runContextById.delete(runId);
187+
state.seqByRun.delete(runId);
188+
swept++;
189+
}
190+
}
191+
return swept;
162192
}
163193

164194
export function resetAgentRunContextForTest() {
165195
getAgentEventState().runContextById.clear();
196+
getAgentEventState().seqByRun.clear();
166197
}
167198

168199
export function emitAgentEvent(event: Omit<AgentEventPayload, "seq" | "ts">) {
169200
const state = getAgentEventState();
170201
const nextSeq = (state.seqByRun.get(event.runId) ?? 0) + 1;
171202
state.seqByRun.set(event.runId, nextSeq);
172203
const context = state.runContextById.get(event.runId);
204+
if (context) {
205+
context.lastActiveAt = Date.now();
206+
}
173207
const isControlUiVisible = context?.isControlUiVisible ?? true;
174208
const eventSessionKey =
175209
typeof event.sessionKey === "string" && event.sessionKey.trim() ? event.sessionKey : undefined;

0 commit comments

Comments
 (0)