Skip to content

Commit c37c43b

Browse files
committed
fix(gateway): bound startup sidecar fanout
1 parent 9d24fde commit c37c43b

7 files changed

Lines changed: 416 additions & 95 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai
3636

3737
### Fixes
3838

39+
- Gateway/startup: bound ACP identity reconcile and stale session-lock cleanup fanout so large persisted session sets no longer monopolize the event loop during sidecar startup. Fixes #85366. Thanks @NianJiuZst.
3940
- Gateway: defer provider auth-state prewarm until after startup readiness so early gateway tool/session requests are not blocked by provider auth discovery. (#85272) Thanks @dutifulbob.
4041
- Agents/Codex: show the first plan update as a transient chat status notice without counting it as final assistant content.
4142
- Gateway/LaunchAgent: wait for launchd reload bootout to finish and fall back to kickstart when bootstrap races, so reload handoff does not leave the service deregistered. Fixes #84630. (#84641) Thanks @NianJiuZst.

src/acp/control-plane/manager.core.ts

Lines changed: 70 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
} from "../../tasks/detached-task-runtime.js";
1515
import { resolveRequiredCompletionTerminalResult } from "../../tasks/task-completion-contract.js";
1616
import type { DeliveryContext } from "../../utils/delivery-context.js";
17+
import { runTasksWithConcurrency } from "../../utils/run-with-concurrency.js";
1718
import {
1819
AcpRuntimeError,
1920
formatAcpErrorChain,
@@ -93,6 +94,11 @@ const ACP_TURN_TIMEOUT_CLEANUP_GRACE_MS = 2_000;
9394
const ACP_TURN_TIMEOUT_REASON = "turn-timeout";
9495
const ACP_BACKGROUND_TASK_TEXT_MAX_LENGTH = 160;
9596
const ACP_BACKGROUND_TASK_PROGRESS_MAX_LENGTH = 240;
97+
const ACP_STARTUP_IDENTITY_RECONCILE_CONCURRENCY = 4;
98+
99+
async function yieldToEventLoop(): Promise<void> {
100+
await new Promise<void>((resolve) => setImmediate(resolve));
101+
}
96102

97103
function summarizeBackgroundTaskText(text: string): string {
98104
const normalized = normalizeText(text) ?? "ACP background task";
@@ -242,69 +248,83 @@ export class AcpSessionManager {
242248
async reconcilePendingSessionIdentities(params: {
243249
cfg: OpenClawConfig;
244250
}): Promise<AcpStartupIdentityReconcileResult> {
245-
let checked = 0;
246-
let resolved = 0;
247-
let failed = 0;
248-
249251
let acpSessions: Awaited<ReturnType<AcpSessionManagerDeps["listAcpSessions"]>>;
250252
try {
251253
acpSessions = await this.deps.listAcpSessions({
252254
cfg: params.cfg,
253255
});
254256
} catch (error) {
255257
logVerbose(`acp-manager: startup identity scan failed: ${String(error)}`);
256-
return { checked, resolved, failed: failed + 1 };
258+
return { checked: 0, resolved: 0, failed: 1 };
257259
}
258260

259-
for (const session of acpSessions) {
260-
if (!session.acp || !session.sessionKey) {
261-
continue;
262-
}
263-
const currentIdentity = resolveSessionIdentityFromMeta(session.acp);
264-
if (
265-
!isSessionIdentityPending(currentIdentity) ||
266-
!identityHasStableSessionId(currentIdentity)
267-
) {
268-
continue;
269-
}
261+
const startupTasks = acpSessions.map(
262+
(session) => async (): Promise<AcpStartupIdentityReconcileResult> => {
263+
if (!session.acp || !session.sessionKey) {
264+
return { checked: 0, resolved: 0, failed: 0 };
265+
}
266+
const currentIdentity = resolveSessionIdentityFromMeta(session.acp);
267+
if (
268+
!isSessionIdentityPending(currentIdentity) ||
269+
!identityHasStableSessionId(currentIdentity)
270+
) {
271+
return { checked: 0, resolved: 0, failed: 0 };
272+
}
270273

271-
checked += 1;
272-
try {
273-
const becameResolved = await this.withSessionActor(session.sessionKey, async () => {
274-
const resolution = this.resolveSession({
275-
cfg: params.cfg,
276-
sessionKey: session.sessionKey,
277-
});
278-
if (resolution.kind !== "ready") {
279-
return false;
280-
}
281-
const { runtime, handle, meta } = await this.ensureRuntimeHandle({
282-
cfg: params.cfg,
283-
sessionKey: session.sessionKey,
284-
meta: resolution.meta,
285-
});
286-
const reconciled = await this.reconcileRuntimeSessionIdentifiers({
287-
cfg: params.cfg,
288-
sessionKey: session.sessionKey,
289-
runtime,
290-
handle,
291-
meta,
292-
failOnStatusError: false,
274+
// Let the gateway service other work between startup sweep items.
275+
await yieldToEventLoop();
276+
277+
try {
278+
const becameResolved = await this.withSessionActor(session.sessionKey, async () => {
279+
const resolution = this.resolveSession({
280+
cfg: params.cfg,
281+
sessionKey: session.sessionKey,
282+
});
283+
if (resolution.kind !== "ready") {
284+
return false;
285+
}
286+
const { runtime, handle, meta } = await this.ensureRuntimeHandle({
287+
cfg: params.cfg,
288+
sessionKey: session.sessionKey,
289+
meta: resolution.meta,
290+
});
291+
const reconciled = await this.reconcileRuntimeSessionIdentifiers({
292+
cfg: params.cfg,
293+
sessionKey: session.sessionKey,
294+
runtime,
295+
handle,
296+
meta,
297+
failOnStatusError: false,
298+
});
299+
return !isSessionIdentityPending(resolveSessionIdentityFromMeta(reconciled.meta));
293300
});
294-
return !isSessionIdentityPending(resolveSessionIdentityFromMeta(reconciled.meta));
295-
});
296-
if (becameResolved) {
297-
resolved += 1;
301+
return {
302+
checked: 1,
303+
resolved: becameResolved ? 1 : 0,
304+
failed: 0,
305+
};
306+
} catch (error) {
307+
logVerbose(
308+
`acp-manager: startup identity reconcile failed for ${session.sessionKey}: ${String(error)}`,
309+
);
310+
return { checked: 1, resolved: 0, failed: 1 };
298311
}
299-
} catch (error) {
300-
failed += 1;
301-
logVerbose(
302-
`acp-manager: startup identity reconcile failed for ${session.sessionKey}: ${String(error)}`,
303-
);
304-
}
305-
}
312+
},
313+
);
306314

307-
return { checked, resolved, failed };
315+
const startup = await runTasksWithConcurrency({
316+
tasks: startupTasks,
317+
limit: ACP_STARTUP_IDENTITY_RECONCILE_CONCURRENCY,
318+
});
319+
320+
return startup.results.reduce<AcpStartupIdentityReconcileResult>(
321+
(totals, current) => ({
322+
checked: totals.checked + (current?.checked ?? 0),
323+
resolved: totals.resolved + (current?.resolved ?? 0),
324+
failed: totals.failed + (current?.failed ?? 0),
325+
}),
326+
{ checked: 0, resolved: 0, failed: 0 },
327+
);
308328
}
309329

310330
async initializeSession(input: AcpInitializeSessionInput): Promise<{

src/acp/control-plane/manager.test.ts

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3916,6 +3916,110 @@ describe("AcpSessionManager", () => {
39163916
expect(currentMeta.identity?.agentSessionId).toBe("agent-session-1");
39173917
});
39183918

3919+
it("bounds startup identity reconciliation fanout while yielding to other work", async () => {
3920+
const runtimeState = createRuntime();
3921+
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
3922+
id: "acpx",
3923+
runtime: runtimeState.runtime,
3924+
});
3925+
3926+
const sessionKeys = Array.from(
3927+
{ length: 9 },
3928+
(_value, index) => `agent:codex:acp:session-${index + 1}`,
3929+
);
3930+
const metaBySessionKey = new Map<string, SessionAcpMeta>(
3931+
sessionKeys.map((sessionKey) => [
3932+
sessionKey,
3933+
{
3934+
...readySessionMeta(),
3935+
runtimeSessionName: `runtime:${sessionKey}`,
3936+
identity: {
3937+
state: "pending",
3938+
source: "ensure",
3939+
acpxSessionId: `acpx:${sessionKey}`,
3940+
lastUpdatedAt: Date.now(),
3941+
},
3942+
},
3943+
]),
3944+
);
3945+
hoisted.listAcpSessionEntriesMock.mockResolvedValue(
3946+
sessionKeys.map((sessionKey) => ({
3947+
cfg: baseCfg,
3948+
storePath: "/tmp/sessions-acp.json",
3949+
sessionKey,
3950+
storeSessionKey: sessionKey,
3951+
entry: {
3952+
sessionId: sessionKey,
3953+
updatedAt: Date.now(),
3954+
acp: metaBySessionKey.get(sessionKey),
3955+
},
3956+
acp: metaBySessionKey.get(sessionKey),
3957+
})),
3958+
);
3959+
hoisted.readAcpSessionEntryMock.mockImplementation((paramsUnknown: unknown) => {
3960+
const sessionKey = (paramsUnknown as { sessionKey?: string }).sessionKey ?? "";
3961+
return {
3962+
sessionKey,
3963+
storeSessionKey: sessionKey,
3964+
acp: metaBySessionKey.get(sessionKey),
3965+
};
3966+
});
3967+
hoisted.upsertAcpSessionMetaMock.mockImplementation(async (paramsUnknown: unknown) => {
3968+
const params = paramsUnknown as {
3969+
sessionKey: string;
3970+
mutate: (
3971+
current: SessionAcpMeta | undefined,
3972+
entry: { acp?: SessionAcpMeta } | undefined,
3973+
) => SessionAcpMeta | null | undefined;
3974+
};
3975+
const current = metaBySessionKey.get(params.sessionKey);
3976+
const next = params.mutate(current, current ? { acp: current } : undefined);
3977+
if (next) {
3978+
metaBySessionKey.set(params.sessionKey, next);
3979+
}
3980+
return {
3981+
sessionId: params.sessionKey,
3982+
updatedAt: Date.now(),
3983+
acp: next,
3984+
};
3985+
});
3986+
3987+
let inFlight = 0;
3988+
let maxInFlight = 0;
3989+
runtimeState.getStatus.mockImplementation(async () => {
3990+
inFlight += 1;
3991+
maxInFlight = Math.max(maxInFlight, inFlight);
3992+
try {
3993+
await sleep(5);
3994+
return {
3995+
summary: "status=alive",
3996+
acpxSessionId: "resolved-acpx-session",
3997+
agentSessionId: "resolved-agent-session",
3998+
details: { status: "alive" },
3999+
};
4000+
} finally {
4001+
inFlight -= 1;
4002+
}
4003+
});
4004+
4005+
let immediateFired = false;
4006+
const immediate = new Promise<void>((resolve) => {
4007+
setImmediate(() => {
4008+
immediateFired = true;
4009+
resolve();
4010+
});
4011+
});
4012+
4013+
const manager = new AcpSessionManager();
4014+
const result = await manager.reconcilePendingSessionIdentities({ cfg: baseCfg });
4015+
await immediate;
4016+
4017+
expect(result).toEqual({ checked: 9, resolved: 9, failed: 0 });
4018+
expect(immediateFired).toBe(true);
4019+
expect(maxInFlight).toBeGreaterThan(1);
4020+
expect(maxInFlight).toBeLessThanOrEqual(4);
4021+
});
4022+
39194023
it("skips startup reconcile for pending identities without stable runtime ids", async () => {
39204024
const runtimeState = createRuntime();
39214025
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({

src/agents/session-write-lock.test.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,73 @@ describe("acquireSessionWriteLock", () => {
552552
}
553553
});
554554

555+
it("bounds stale lock cleanup fanout while yielding to other work", async () => {
556+
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-fanout-"));
557+
const sessionsDir = path.join(root, "sessions");
558+
await fs.mkdir(sessionsDir, { recursive: true });
559+
560+
const nowMs = Date.now();
561+
const lockNames = Array.from(
562+
{ length: 9 },
563+
(_value, index) => `${String(index).padStart(2, "0")}.jsonl.lock`,
564+
);
565+
566+
try {
567+
await Promise.all(
568+
lockNames.map((lockName) =>
569+
fs.writeFile(
570+
path.join(sessionsDir, lockName),
571+
JSON.stringify({
572+
pid: process.pid,
573+
createdAt: new Date(nowMs).toISOString(),
574+
}),
575+
"utf8",
576+
),
577+
),
578+
);
579+
580+
const readFileOriginal = fs.readFile.bind(fs);
581+
const readFileSpy = vi.spyOn(fs, "readFile");
582+
try {
583+
let inFlight = 0;
584+
let maxInFlight = 0;
585+
readFileSpy.mockImplementation(async (...args: Parameters<typeof fs.readFile>) => {
586+
const filePath = args[0];
587+
if (typeof filePath !== "string" || !filePath.endsWith(".jsonl.lock")) {
588+
return await readFileOriginal(...args);
589+
}
590+
inFlight += 1;
591+
maxInFlight = Math.max(maxInFlight, inFlight);
592+
try {
593+
await new Promise<void>((resolve) => setTimeout(resolve, 5));
594+
return await readFileOriginal(...args);
595+
} finally {
596+
inFlight -= 1;
597+
}
598+
});
599+
const resultPromise = cleanStaleLockFiles({
600+
sessionsDir,
601+
staleMs: 30_000,
602+
nowMs,
603+
removeStale: true,
604+
readOwnerProcessArgs: () => ["python", "worker.py"],
605+
});
606+
607+
await new Promise<void>((resolve) => setImmediate(resolve));
608+
const result = await resultPromise;
609+
610+
expect(result.cleaned).toHaveLength(lockNames.length);
611+
expect(result.locks.map((lock) => path.basename(lock.lockPath))).toEqual(lockNames);
612+
expect(maxInFlight).toBeGreaterThan(1);
613+
expect(maxInFlight).toBeLessThanOrEqual(testing.SESSION_LOCK_CLEANUP_CONCURRENCY);
614+
} finally {
615+
readFileSpy.mockRestore();
616+
}
617+
} finally {
618+
await fs.rm(root, { recursive: true, force: true });
619+
}
620+
});
621+
555622
it("cleans fresh live .jsonl lock files owned by a non-OpenClaw process", async () => {
556623
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-lock-"));
557624
const sessionsDir = path.join(root, "sessions");

0 commit comments

Comments
 (0)