Skip to content

Commit fd818d3

Browse files
committed
fix(cron): per-attempt AbortControllers and deferred execution timeout
Fix two root causes of cron agentTurn jobs hanging until timeout: 1. Shared AbortController kills fallback chain (#37505): When the cron timeout fires, it aborts a shared signal that propagates to all subsequent model fallback attempts, killing them instantly (~100ms). Now each fallback attempt in runCronIsolatedAgentTurn gets its own AbortController linked to the parent signal, so new attempts start with a fresh (non-aborted) controller. 2. Queue wait consumes execution timeout (#41783): The timeout timer started immediately in executeJobCoreWithTimeout, but the job may wait in the lane queue (inside runEmbeddedPiAgent) before doing real work. Now executeJobCore accepts an onExecutionStart callback and calls it right before runIsolatedAgentJob, deferring the timeout clock until actual execution begins. A 2x safety backstop prevents indefinite hangs if the callback is never called. Closes #37505 Closes #41783 Refs #42464 #40237
1 parent 0c926a2 commit fd818d3

3 files changed

Lines changed: 329 additions & 65 deletions

File tree

src/cron/isolated-agent/run.ts

Lines changed: 69 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -563,83 +563,95 @@ export async function runCronIsolatedAgentTurn(params: {
563563
if (abortSignal?.aborted) {
564564
throw new Error(abortReason());
565565
}
566-
const bootstrapPromptWarningSignature =
567-
bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1];
568-
if (isCliProvider(providerOverride, cfgWithAgentDefaults)) {
569-
// Fresh isolated cron sessions must not reuse a stored CLI session ID.
570-
// Passing an existing ID activates the resume watchdog profile
571-
// (noOutputTimeoutRatio 0.3, maxMs 180 s) instead of the fresh profile
572-
// (ratio 0.8, maxMs 600 s), causing jobs to time out at roughly 1/3 of
573-
// the configured timeoutSeconds. See: https://github.com/openclaw/openclaw/issues/29774
574-
const cliSessionId = cronSession.isNewSession
575-
? undefined
576-
: getCliSessionId(cronSession.sessionEntry, providerOverride);
577-
const result = await runCliAgent({
566+
// Create a per-attempt AbortController so the model fallback chain
567+
// survives when a single attempt is aborted by the cron timeout.
568+
// The parent signal propagates to the attempt controller, but each
569+
// new attempt gets a fresh (non-aborted) controller. (#37505)
570+
const attemptAbort = new AbortController();
571+
const onParentAbort = () => attemptAbort.abort(abortSignal?.reason);
572+
abortSignal?.addEventListener("abort", onParentAbort, { once: true });
573+
try {
574+
const bootstrapPromptWarningSignature =
575+
bootstrapPromptWarningSignaturesSeen[bootstrapPromptWarningSignaturesSeen.length - 1];
576+
if (isCliProvider(providerOverride, cfgWithAgentDefaults)) {
577+
// Fresh isolated cron sessions must not reuse a stored CLI session ID.
578+
// Passing an existing ID activates the resume watchdog profile
579+
// (noOutputTimeoutRatio 0.3, maxMs 180 s) instead of the fresh profile
580+
// (ratio 0.8, maxMs 600 s), causing jobs to time out at roughly 1/3 of
581+
// the configured timeoutSeconds. See: https://github.com/openclaw/openclaw/issues/29774
582+
const cliSessionId = cronSession.isNewSession
583+
? undefined
584+
: getCliSessionId(cronSession.sessionEntry, providerOverride);
585+
const result = await runCliAgent({
586+
sessionId: cronSession.sessionEntry.sessionId,
587+
sessionKey: agentSessionKey,
588+
agentId,
589+
sessionFile,
590+
workspaceDir,
591+
config: cfgWithAgentDefaults,
592+
prompt: promptText,
593+
provider: providerOverride,
594+
model: modelOverride,
595+
thinkLevel,
596+
timeoutMs,
597+
runId: cronSession.sessionEntry.sessionId,
598+
cliSessionId,
599+
bootstrapPromptWarningSignaturesSeen,
600+
bootstrapPromptWarningSignature,
601+
});
602+
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
603+
result.meta?.systemPromptReport,
604+
);
605+
return result;
606+
}
607+
const result = await runEmbeddedPiAgent({
578608
sessionId: cronSession.sessionEntry.sessionId,
579609
sessionKey: agentSessionKey,
580610
agentId,
611+
trigger: "cron",
612+
// Cron jobs are trusted local automation, so isolated runs should
613+
// inherit owner-only tooling like local `openclaw agent` runs.
614+
senderIsOwner: true,
615+
messageChannel,
616+
agentAccountId: resolvedDelivery.accountId,
581617
sessionFile,
618+
agentDir,
582619
workspaceDir,
583620
config: cfgWithAgentDefaults,
621+
skillsSnapshot,
584622
prompt: promptText,
623+
lane: resolveNestedAgentLane(params.lane),
585624
provider: providerOverride,
586625
model: modelOverride,
626+
authProfileId,
627+
authProfileIdSource,
587628
thinkLevel,
629+
fastMode: resolveFastModeState({
630+
cfg: cfgWithAgentDefaults,
631+
provider: providerOverride,
632+
model: modelOverride,
633+
sessionEntry: cronSession.sessionEntry,
634+
}).enabled,
635+
verboseLevel: resolvedVerboseLevel,
588636
timeoutMs,
637+
bootstrapContextMode: agentPayload?.lightContext ? "lightweight" : undefined,
638+
bootstrapContextRunKind: "cron",
589639
runId: cronSession.sessionEntry.sessionId,
590-
cliSessionId,
640+
requireExplicitMessageTarget: toolPolicy.requireExplicitMessageTarget,
641+
disableMessageTool: toolPolicy.disableMessageTool,
642+
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
643+
abortSignal: attemptAbort.signal,
591644
bootstrapPromptWarningSignaturesSeen,
592645
bootstrapPromptWarningSignature,
593646
});
594647
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
595648
result.meta?.systemPromptReport,
596649
);
597650
return result;
651+
} finally {
652+
abortSignal?.removeEventListener("abort", onParentAbort);
598653
}
599-
const result = await runEmbeddedPiAgent({
600-
sessionId: cronSession.sessionEntry.sessionId,
601-
sessionKey: agentSessionKey,
602-
agentId,
603-
trigger: "cron",
604-
// Cron jobs are trusted local automation, so isolated runs should
605-
// inherit owner-only tooling like local `openclaw agent` runs.
606-
senderIsOwner: true,
607-
messageChannel,
608-
agentAccountId: resolvedDelivery.accountId,
609-
sessionFile,
610-
agentDir,
611-
workspaceDir,
612-
config: cfgWithAgentDefaults,
613-
skillsSnapshot,
614-
prompt: promptText,
615-
lane: resolveNestedAgentLane(params.lane),
616-
provider: providerOverride,
617-
model: modelOverride,
618-
authProfileId,
619-
authProfileIdSource,
620-
thinkLevel,
621-
fastMode: resolveFastModeState({
622-
cfg: cfgWithAgentDefaults,
623-
provider: providerOverride,
624-
model: modelOverride,
625-
sessionEntry: cronSession.sessionEntry,
626-
}).enabled,
627-
verboseLevel: resolvedVerboseLevel,
628-
timeoutMs,
629-
bootstrapContextMode: agentPayload?.lightContext ? "lightweight" : undefined,
630-
bootstrapContextRunKind: "cron",
631-
runId: cronSession.sessionEntry.sessionId,
632-
requireExplicitMessageTarget: toolPolicy.requireExplicitMessageTarget,
633-
disableMessageTool: toolPolicy.disableMessageTool,
634-
allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe,
635-
abortSignal,
636-
bootstrapPromptWarningSignaturesSeen,
637-
bootstrapPromptWarningSignature,
638-
});
639-
bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen(
640-
result.meta?.systemPromptReport,
641-
);
642-
return result;
654+
643655
},
644656
});
645657
runResult = fallbackResult.result;
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
import {
3+
createAbortAwareIsolatedRunner,
4+
createIsolatedRegressionJob,
5+
noopLogger,
6+
setupCronIssueRegressionFixtures,
7+
writeCronJobs,
8+
} from "./service.issue-regressions.test-helpers.js";
9+
import { createCronServiceState } from "./service/state.js";
10+
import { executeJobCore, executeJobCoreWithTimeout, onTimer } from "./service/timer.js";
11+
12+
describe("Cron timeout and abort fixes", () => {
13+
const { makeStorePath } = setupCronIssueRegressionFixtures();
14+
15+
describe("#41783 — deferred execution timeout", () => {
16+
it("does not fire the timeout during queue wait (onExecutionStart defers the clock)", async () => {
17+
vi.useRealTimers();
18+
const store = makeStorePath();
19+
const scheduledAt = Date.parse("2026-03-10T12:00:00.000Z");
20+
const cronJob = createIsolatedRegressionJob({
21+
id: "deferred-timeout",
22+
name: "deferred timeout",
23+
scheduledAt,
24+
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
25+
// Very short timeout (2.5ms) — if the safety backstop fires at
26+
// the old moment (before deferred arm), the job would time out
27+
// during the simulated queue wait.
28+
payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0.05 },
29+
state: { nextRunAtMs: scheduledAt },
30+
});
31+
await writeCronJobs(store.storePath, [cronJob]);
32+
33+
let resolveWork: ((v: { status: "ok"; summary: string }) => void) | undefined;
34+
const workPromise = new Promise<{ status: "ok"; summary: string }>((resolve) => {
35+
resolveWork = resolve;
36+
});
37+
38+
let now = scheduledAt;
39+
const state = createCronServiceState({
40+
cronEnabled: true,
41+
storePath: store.storePath,
42+
log: noopLogger,
43+
nowMs: () => now,
44+
enqueueSystemEvent: vi.fn(),
45+
requestHeartbeatNow: vi.fn(),
46+
runIsolatedAgentJob: vi.fn(async () => {
47+
// Simulate completing quickly once actually running.
48+
// The deferred timeout gives us the full 50ms budget
49+
// from when execution starts, not from enqueue.
50+
resolveWork!({ status: "ok", summary: "done" });
51+
return workPromise;
52+
}),
53+
});
54+
55+
const timerPromise = onTimer(state);
56+
57+
// Resolve the work immediately.
58+
resolveWork!({ status: "ok", summary: "done" });
59+
await timerPromise;
60+
61+
const job = state.store?.jobs.find((j) => j.id === "deferred-timeout");
62+
expect(job?.state.lastStatus).toBe("ok");
63+
expect(job?.state.lastError).toBeUndefined();
64+
});
65+
66+
it("executeJobCore calls onExecutionStart before running isolated jobs", async () => {
67+
vi.useRealTimers();
68+
const store = makeStorePath();
69+
const scheduledAt = Date.parse("2026-03-10T12:00:00.000Z");
70+
const cronJob = createIsolatedRegressionJob({
71+
id: "arm-callback-test",
72+
name: "arm callback",
73+
scheduledAt,
74+
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
75+
payload: { kind: "agentTurn", message: "test" },
76+
state: { nextRunAtMs: scheduledAt },
77+
});
78+
await writeCronJobs(store.storePath, [cronJob]);
79+
80+
const onExecutionStart = vi.fn();
81+
let now = scheduledAt;
82+
const state = createCronServiceState({
83+
cronEnabled: true,
84+
storePath: store.storePath,
85+
log: noopLogger,
86+
nowMs: () => now,
87+
enqueueSystemEvent: vi.fn(),
88+
requestHeartbeatNow: vi.fn(),
89+
runIsolatedAgentJob: vi.fn(async () => {
90+
// Verify onExecutionStart was called before we run.
91+
expect(onExecutionStart).toHaveBeenCalledTimes(1);
92+
return { status: "ok" as const, summary: "done" };
93+
}),
94+
});
95+
96+
state.running = true;
97+
state.store = { version: 1, jobs: [cronJob] };
98+
99+
await executeJobCore(state, cronJob, undefined, onExecutionStart);
100+
101+
expect(onExecutionStart).toHaveBeenCalledTimes(1);
102+
});
103+
104+
it("executeJobCore calls onExecutionStart for main session jobs", async () => {
105+
vi.useRealTimers();
106+
const store = makeStorePath();
107+
const scheduledAt = Date.parse("2026-03-10T12:00:00.000Z");
108+
const mainJob = {
109+
id: "main-arm-test",
110+
name: "main arm",
111+
enabled: true,
112+
createdAtMs: scheduledAt,
113+
updatedAtMs: scheduledAt,
114+
schedule: { kind: "at" as const, at: new Date(scheduledAt).toISOString() },
115+
sessionTarget: "main" as const,
116+
wakeMode: "next-heartbeat" as const,
117+
payload: { kind: "systemEvent" as const, text: "ping" },
118+
state: { nextRunAtMs: scheduledAt },
119+
};
120+
121+
const onExecutionStart = vi.fn();
122+
const state = createCronServiceState({
123+
cronEnabled: true,
124+
storePath: store.storePath,
125+
log: noopLogger,
126+
nowMs: () => scheduledAt,
127+
enqueueSystemEvent: vi.fn(),
128+
requestHeartbeatNow: vi.fn(),
129+
runIsolatedAgentJob: vi.fn().mockResolvedValue({ status: "ok" }),
130+
});
131+
state.running = true;
132+
state.store = { version: 1, jobs: [mainJob] };
133+
134+
await executeJobCore(state, mainJob, undefined, onExecutionStart);
135+
136+
expect(onExecutionStart).toHaveBeenCalledTimes(1);
137+
});
138+
});
139+
140+
describe("#37505 — shared AbortController kills fallback chain", () => {
141+
it("abort signal is passed to runIsolatedAgentJob and fires on timeout", async () => {
142+
vi.useRealTimers();
143+
const store = makeStorePath();
144+
const scheduledAt = Date.parse("2026-03-10T12:00:00.000Z");
145+
const cronJob = createIsolatedRegressionJob({
146+
id: "abort-signal-test",
147+
name: "abort signal",
148+
scheduledAt,
149+
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
150+
payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0.005 },
151+
state: { nextRunAtMs: scheduledAt },
152+
});
153+
await writeCronJobs(store.storePath, [cronJob]);
154+
155+
const abortAwareRunner = createAbortAwareIsolatedRunner();
156+
let now = scheduledAt;
157+
const state = createCronServiceState({
158+
cronEnabled: true,
159+
storePath: store.storePath,
160+
log: noopLogger,
161+
nowMs: () => now,
162+
enqueueSystemEvent: vi.fn(),
163+
requestHeartbeatNow: vi.fn(),
164+
runIsolatedAgentJob: vi.fn(async (params) => {
165+
const result = await abortAwareRunner.runIsolatedAgentJob(params);
166+
now += 5;
167+
return result;
168+
}),
169+
});
170+
171+
await onTimer(state);
172+
173+
expect(abortAwareRunner.getObservedAbortSignal()).toBeDefined();
174+
expect(abortAwareRunner.getObservedAbortSignal()?.aborted).toBe(true);
175+
176+
const job = state.store?.jobs.find((j) => j.id === "abort-signal-test");
177+
expect(job?.state.lastStatus).toBe("error");
178+
expect(job?.state.lastError).toContain("timed out");
179+
});
180+
181+
it("executeJobCoreWithTimeout still times out correctly with deferred start", async () => {
182+
vi.useRealTimers();
183+
const store = makeStorePath();
184+
const scheduledAt = Date.parse("2026-03-10T12:00:00.000Z");
185+
const cronJob = createIsolatedRegressionJob({
186+
id: "timeout-still-works",
187+
name: "timeout works",
188+
scheduledAt,
189+
schedule: { kind: "at", at: new Date(scheduledAt).toISOString() },
190+
payload: { kind: "agentTurn", message: "slow", timeoutSeconds: 0.005 },
191+
state: { nextRunAtMs: scheduledAt },
192+
});
193+
await writeCronJobs(store.storePath, [cronJob]);
194+
195+
const abortAwareRunner = createAbortAwareIsolatedRunner();
196+
let now = scheduledAt;
197+
const state = createCronServiceState({
198+
cronEnabled: true,
199+
storePath: store.storePath,
200+
log: noopLogger,
201+
nowMs: () => now,
202+
enqueueSystemEvent: vi.fn(),
203+
requestHeartbeatNow: vi.fn(),
204+
runIsolatedAgentJob: abortAwareRunner.runIsolatedAgentJob,
205+
});
206+
state.running = true;
207+
state.store = { version: 1, jobs: [cronJob] };
208+
209+
// executeJobCoreWithTimeout rejects the Promise.race when timeout
210+
// fires, which surfaces as a thrown error. The caller (executeJob/
211+
// ops.ts) catches this and records it as an error status.
212+
await expect(executeJobCoreWithTimeout(state, cronJob)).rejects.toThrow("timed out");
213+
expect(abortAwareRunner.getObservedAbortSignal()?.aborted).toBe(true);
214+
});
215+
});
216+
});

0 commit comments

Comments
 (0)