Skip to content

Commit 630df26

Browse files
committed
fix(cron): mirror active-jobs mark/clear on startup catchup and manual run
Upstream 7d1575b (#60310) introduced the activeJobIds singleton plus markCronJobActive/clearCronJobActive so task-registry maintenance has a backing-session signal for runtime='cron' tasks (task-registry.maintenance.ts:124-128). That patch wired the pair into runDueJob (timer.ts:746/586) and executeJob (timer.ts:1344/1374) but left the remaining two execution paths uninstrumented: * runStartupCatchupCandidate (timer.ts:1043-1081) * prepareManualRun / finishPreparedManualRun (ops.ts:548-686) For runs taken on those paths, the cron branch of hasBackingSession sees isCronJobActive=false and, once TASK_RECONCILE_GRACE_MS (5 min, task-registry.maintenance.ts:28) elapses, marks the task 'lost' while the cron service is still executing it. With DEFAULT_JOB_TIMEOUT_MS=10 min (cron/service/timeout-policy.ts:8) and no recordTaskRunProgressByRunId emissions on isolated agentTurn runs, lastEventAt is pinned to startedAt so the grace is exceeded in practice. This PR mirrors the existing mark/clear contract on the two missing paths inside try/finally, completing #60310's intent. No behavioural change to runDueJob / executeJob. Related #68157 (partially addresses the task-registry misclassification aspect; the runningAtMs persistence aspect described in that issue is a separate state machine not touched by this PR). Architecture note: PR #69313 introduced tryRecoverTaskBeforeMarkLost hook infrastructure but cron does not register it, and registering the hook alone would not close this gap (the recover callback would still need an alive-signal source — i.e. activeJobIds). This PR completes the existing contract; registering the hook for cross-runtime parity is a natural follow-up if maintainers prefer that direction. [AI-assisted]
1 parent 6c1cffa commit 630df26

3 files changed

Lines changed: 219 additions & 73 deletions

File tree

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import { beforeEach, describe, expect, it } from "vitest";
2+
import { isCronJobActive, resetCronActiveJobsForTests } from "./active-jobs.js";
3+
import { CronService } from "./service.js";
4+
import {
5+
createDeferred,
6+
setupCronServiceSuite,
7+
writeCronStoreSnapshot,
8+
} from "./service.test-harness.js";
9+
import type { CronJob } from "./types.js";
10+
11+
const { logger, makeStorePath } = setupCronServiceSuite({
12+
prefix: "openclaw-cron-active-jobs-symmetry-",
13+
baseTimeIso: "2025-12-13T17:00:00.000Z",
14+
});
15+
16+
type IsolatedRunResult = Awaited<
17+
ReturnType<NonNullable<ConstructorParameters<typeof CronService>[0]["runIsolatedAgentJob"]>>
18+
>;
19+
20+
describe("cron activeJobIds — mark/clear symmetry across execution paths", () => {
21+
beforeEach(() => {
22+
resetCronActiveJobsForTests();
23+
});
24+
25+
it("startup catchup marks the job active during execution and clears it on completion (#68157)", async () => {
26+
const store = await makeStorePath();
27+
const now = Date.parse("2025-12-13T17:00:00.000Z");
28+
const overdueAt = now - 60_000;
29+
30+
const jobs: CronJob[] = [
31+
{
32+
id: "catchup-isolated",
33+
name: "catchup isolated",
34+
enabled: true,
35+
createdAtMs: overdueAt - 3_600_000,
36+
updatedAtMs: overdueAt,
37+
schedule: { kind: "cron", expr: "* * * * *", tz: "UTC" },
38+
sessionTarget: "isolated",
39+
wakeMode: "next-heartbeat",
40+
payload: { kind: "agentTurn", message: "hi" },
41+
delivery: { mode: "none" },
42+
state: {
43+
nextRunAtMs: overdueAt,
44+
},
45+
},
46+
];
47+
48+
await writeCronStoreSnapshot({ storePath: store.storePath, jobs });
49+
50+
const entered = createDeferred<void>();
51+
const release = createDeferred<IsolatedRunResult>();
52+
const cron = new CronService({
53+
storePath: store.storePath,
54+
cronEnabled: true,
55+
log: logger,
56+
enqueueSystemEvent: () => {},
57+
requestHeartbeatNow: () => {},
58+
runIsolatedAgentJob: async () => {
59+
entered.resolve();
60+
return await release.promise;
61+
},
62+
});
63+
64+
try {
65+
const startPromise = cron.start();
66+
67+
await entered.promise;
68+
69+
expect(isCronJobActive("catchup-isolated")).toBe(true);
70+
71+
release.resolve({ status: "ok", summary: "ok" });
72+
await startPromise;
73+
74+
expect(isCronJobActive("catchup-isolated")).toBe(false);
75+
} finally {
76+
cron.stop();
77+
await store.cleanup();
78+
}
79+
});
80+
81+
it("manual run marks the job active during execution and clears it even when the inner throws (#68157)", async () => {
82+
const store = await makeStorePath();
83+
const now = Date.parse("2025-12-13T17:00:00.000Z");
84+
const futureNext = now + 3_600_000;
85+
86+
const jobs: CronJob[] = [
87+
{
88+
id: "manual-isolated",
89+
name: "manual isolated",
90+
enabled: true,
91+
createdAtMs: now - 3_600_000,
92+
updatedAtMs: now,
93+
schedule: { kind: "cron", expr: "0 18 * * *", tz: "UTC" },
94+
sessionTarget: "isolated",
95+
wakeMode: "next-heartbeat",
96+
payload: { kind: "agentTurn", message: "hi" },
97+
delivery: { mode: "none" },
98+
state: {
99+
nextRunAtMs: futureNext,
100+
},
101+
},
102+
];
103+
104+
await writeCronStoreSnapshot({ storePath: store.storePath, jobs });
105+
106+
const entered = createDeferred<void>();
107+
const release = createDeferred<IsolatedRunResult>();
108+
const cron = new CronService({
109+
storePath: store.storePath,
110+
cronEnabled: true,
111+
log: logger,
112+
enqueueSystemEvent: () => {},
113+
requestHeartbeatNow: () => {},
114+
runIsolatedAgentJob: async () => {
115+
entered.resolve();
116+
return await release.promise;
117+
},
118+
});
119+
120+
try {
121+
await cron.start();
122+
123+
const runPromise = cron.run("manual-isolated", "force");
124+
await entered.promise;
125+
126+
expect(isCronJobActive("manual-isolated")).toBe(true);
127+
128+
release.reject(new Error("synthetic inner failure"));
129+
await runPromise;
130+
131+
expect(isCronJobActive("manual-isolated")).toBe(false);
132+
} finally {
133+
cron.stop();
134+
await store.cleanup();
135+
}
136+
});
137+
});

src/cron/service/ops.ts

Lines changed: 79 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
createRunningTaskRun,
77
failTaskRunByRunId,
88
} from "../../tasks/detached-task-runtime.js";
9+
import { clearCronJobActive, markCronJobActive } from "../active-jobs.js";
910
import { createCronExecutionId } from "../run-id.js";
1011
import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js";
1112
import {
@@ -643,6 +644,7 @@ async function prepareManualRun(
643644
job,
644645
startedAt: preflight.now,
645646
});
647+
markCronJobActive(job.id);
646648
const executionJob = structuredClone(job);
647649
return {
648650
ok: true,
@@ -665,88 +667,92 @@ async function finishPreparedManualRun(
665667
const jobId = prepared.jobId;
666668
const taskRunId = prepared.taskRunId;
667669

668-
let coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
669670
try {
670-
coreResult = await executeJobCoreWithTimeout(state, executionJob);
671-
} catch (err) {
672-
coreResult = { status: "error", error: normalizeCronRunErrorText(err) };
673-
}
674-
const endedAt = state.deps.nowMs();
675-
tryFinishManualTaskRun(state, {
676-
taskRunId,
677-
coreResult,
678-
endedAt,
679-
});
680-
681-
await locked(state, async () => {
682-
await ensureLoaded(state, { skipRecompute: true });
683-
const job = state.store?.jobs.find((entry) => entry.id === jobId);
684-
if (!job) {
685-
return;
671+
let coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
672+
try {
673+
coreResult = await executeJobCoreWithTimeout(state, executionJob);
674+
} catch (err) {
675+
coreResult = { status: "error", error: normalizeCronRunErrorText(err) };
686676
}
677+
const endedAt = state.deps.nowMs();
678+
tryFinishManualTaskRun(state, {
679+
taskRunId,
680+
coreResult,
681+
endedAt,
682+
});
687683

688-
const shouldDelete = applyJobResult(
689-
state,
690-
job,
691-
{
684+
await locked(state, async () => {
685+
await ensureLoaded(state, { skipRecompute: true });
686+
const job = state.store?.jobs.find((entry) => entry.id === jobId);
687+
if (!job) {
688+
return;
689+
}
690+
691+
const shouldDelete = applyJobResult(
692+
state,
693+
job,
694+
{
695+
status: coreResult.status,
696+
error: coreResult.error,
697+
delivered: coreResult.delivered,
698+
startedAt,
699+
endedAt,
700+
},
701+
{ preserveSchedule: mode === "force" },
702+
);
703+
704+
emit(state, {
705+
jobId: job.id,
706+
action: "finished",
692707
status: coreResult.status,
693708
error: coreResult.error,
709+
summary: coreResult.summary,
694710
delivered: coreResult.delivered,
695-
startedAt,
696-
endedAt,
697-
},
698-
{ preserveSchedule: mode === "force" },
699-
);
700-
701-
emit(state, {
702-
jobId: job.id,
703-
action: "finished",
704-
status: coreResult.status,
705-
error: coreResult.error,
706-
summary: coreResult.summary,
707-
delivered: coreResult.delivered,
708-
deliveryStatus: job.state.lastDeliveryStatus,
709-
deliveryError: job.state.lastDeliveryError,
710-
delivery: coreResult.delivery,
711-
sessionId: coreResult.sessionId,
712-
sessionKey: coreResult.sessionKey,
713-
runAtMs: startedAt,
714-
durationMs: job.state.lastDurationMs,
715-
nextRunAtMs: job.state.nextRunAtMs,
716-
model: coreResult.model,
717-
provider: coreResult.provider,
718-
usage: coreResult.usage,
719-
});
711+
deliveryStatus: job.state.lastDeliveryStatus,
712+
deliveryError: job.state.lastDeliveryError,
713+
delivery: coreResult.delivery,
714+
sessionId: coreResult.sessionId,
715+
sessionKey: coreResult.sessionKey,
716+
runAtMs: startedAt,
717+
durationMs: job.state.lastDurationMs,
718+
nextRunAtMs: job.state.nextRunAtMs,
719+
model: coreResult.model,
720+
provider: coreResult.provider,
721+
usage: coreResult.usage,
722+
});
720723

721-
if (shouldDelete && state.store) {
722-
state.store.jobs = state.store.jobs.filter((entry) => entry.id !== job.id);
723-
emit(state, { jobId: job.id, action: "removed" });
724-
}
724+
if (shouldDelete && state.store) {
725+
state.store.jobs = state.store.jobs.filter((entry) => entry.id !== job.id);
726+
emit(state, { jobId: job.id, action: "removed" });
727+
}
725728

726-
// Manual runs should not advance other due jobs without executing them.
727-
// Use maintenance-only recompute to repair missing values while
728-
// preserving existing past-due nextRunAtMs entries for future timer ticks.
729-
const postRunSnapshot = shouldDelete
730-
? null
731-
: {
732-
enabled: job.enabled,
733-
updatedAtMs: job.updatedAtMs,
734-
state: structuredClone(job.state),
735-
};
736-
const postRunRemoved = shouldDelete;
737-
// Isolated Telegram send can persist target writeback directly to disk.
738-
// Reload before final persist so manual `cron run` keeps those changes.
739-
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
740-
mergeManualRunSnapshotAfterReload({
741-
state,
742-
jobId,
743-
snapshot: postRunSnapshot,
744-
removed: postRunRemoved,
729+
// Manual runs should not advance other due jobs without executing them.
730+
// Use maintenance-only recompute to repair missing values while
731+
// preserving existing past-due nextRunAtMs entries for future timer ticks.
732+
const postRunSnapshot = shouldDelete
733+
? null
734+
: {
735+
enabled: job.enabled,
736+
updatedAtMs: job.updatedAtMs,
737+
state: structuredClone(job.state),
738+
};
739+
const postRunRemoved = shouldDelete;
740+
// Isolated Telegram send can persist target writeback directly to disk.
741+
// Reload before final persist so manual `cron run` keeps those changes.
742+
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
743+
mergeManualRunSnapshotAfterReload({
744+
state,
745+
jobId,
746+
snapshot: postRunSnapshot,
747+
removed: postRunRemoved,
748+
});
749+
recomputeNextRunsForMaintenance(state, { recomputeExpired: true });
750+
await persist(state);
751+
armTimer(state);
745752
});
746-
recomputeNextRunsForMaintenance(state, { recomputeExpired: true });
747-
await persist(state);
748-
armTimer(state);
749-
});
753+
} finally {
754+
clearCronJobActive(jobId);
755+
}
750756
}
751757

752758
export async function run(state: CronServiceState, id: string, mode?: "due" | "force") {

src/cron/service/timer.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,7 @@ async function runStartupCatchupCandidate(
10511051
startedAt,
10521052
});
10531053
emit(state, { jobId: candidate.job.id, action: "started", runAtMs: startedAt });
1054+
markCronJobActive(candidate.job.id);
10541055
try {
10551056
const result = await executeJobCoreWithTimeout(state, candidate.job);
10561057
return {
@@ -1077,6 +1078,8 @@ async function runStartupCatchupCandidate(
10771078
startedAt,
10781079
endedAt: state.deps.nowMs(),
10791080
};
1081+
} finally {
1082+
clearCronJobActive(candidate.job.id);
10801083
}
10811084
}
10821085

0 commit comments

Comments
 (0)