Skip to content

Commit 313e2f2

Browse files
pycckuutyler6204
andauthored
fix(cron): prevent recomputeNextRuns from skipping due jobs in onTimer (#9823)
* fix(cron): prevent recomputeNextRuns from skipping due jobs in onTimer ensureLoaded(forceReload) called recomputeNextRuns before runDueJobs, which recalculated nextRunAtMs to a strictly future time. Since setTimeout always fires a few ms late, the due check (now >= nextRunAtMs) always failed and every/cron jobs never executed. Fixes #9788. * docs: add changelog entry for cron timer race fix (#9823) (thanks @pycckuu) --------- Co-authored-by: Tyler Yust <TYTYYUST@YAHOO.COM>
1 parent 68393bf commit 313e2f2

File tree

4 files changed

+151
-5
lines changed

4 files changed

+151
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ Docs: https://docs.openclaw.ai
5454
- Voice call: add regression coverage for anonymous inbound caller IDs with allowlist policy. (#8104) Thanks @victormier.
5555
- Cron: accept epoch timestamps and 0ms durations in CLI `--at` parsing.
5656
- Cron: reload store data when the store file is recreated or mtime changes.
57+
- Cron: prevent `recomputeNextRuns` from skipping due jobs when timer fires late by reordering `onTimer` flow. (#9823, fixes #9788) Thanks @pycckuu.
5758
- Cron: deliver announce runs directly, honor delivery mode, and respect wakeMode for summaries. (#8540) Thanks @tyler6204.
5859
- Cron: correct announce delivery inference for thread session keys and null delivery inputs. (#9733) Thanks @tyler6204.
5960
- Telegram: include forward_from_chat metadata in forwarded messages and harden cron delivery target checks. (#8392) Thanks @Glucksberg.
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import fs from "node:fs/promises";
2+
import os from "node:os";
3+
import path from "node:path";
4+
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
5+
import { CronService } from "./service.js";
6+
7+
const noopLogger = {
8+
debug: vi.fn(),
9+
info: vi.fn(),
10+
warn: vi.fn(),
11+
error: vi.fn(),
12+
};
13+
14+
async function makeStorePath() {
15+
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-cron-"));
16+
return {
17+
storePath: path.join(dir, "cron", "jobs.json"),
18+
cleanup: async () => {
19+
await fs.rm(dir, { recursive: true, force: true });
20+
},
21+
};
22+
}
23+
24+
describe("CronService interval/cron jobs fire on time", () => {
25+
beforeEach(() => {
26+
vi.useFakeTimers();
27+
vi.setSystemTime(new Date("2025-12-13T00:00:00.000Z"));
28+
noopLogger.debug.mockClear();
29+
noopLogger.info.mockClear();
30+
noopLogger.warn.mockClear();
31+
noopLogger.error.mockClear();
32+
});
33+
34+
afterEach(() => {
35+
vi.useRealTimers();
36+
});
37+
38+
it("fires an every-type main job when the timer fires a few ms late", async () => {
39+
const store = await makeStorePath();
40+
const enqueueSystemEvent = vi.fn();
41+
const requestHeartbeatNow = vi.fn();
42+
43+
const cron = new CronService({
44+
storePath: store.storePath,
45+
cronEnabled: true,
46+
log: noopLogger,
47+
enqueueSystemEvent,
48+
requestHeartbeatNow,
49+
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
50+
});
51+
52+
await cron.start();
53+
const job = await cron.add({
54+
name: "every 10s check",
55+
enabled: true,
56+
schedule: { kind: "every", everyMs: 10_000 },
57+
sessionTarget: "main",
58+
wakeMode: "next-heartbeat",
59+
payload: { kind: "systemEvent", text: "tick" },
60+
});
61+
62+
const firstDueAt = job.state.nextRunAtMs!;
63+
expect(firstDueAt).toBe(Date.parse("2025-12-13T00:00:00.000Z") + 10_000);
64+
65+
// Simulate setTimeout firing 5ms late (the race condition).
66+
vi.setSystemTime(new Date(firstDueAt + 5));
67+
await vi.runOnlyPendingTimersAsync();
68+
69+
// Wait for the async onTimer to complete via the lock queue.
70+
const jobs = await cron.list();
71+
const updated = jobs.find((j) => j.id === job.id);
72+
73+
expect(enqueueSystemEvent).toHaveBeenCalledWith("tick", { agentId: undefined });
74+
expect(updated?.state.lastStatus).toBe("ok");
75+
// nextRunAtMs must advance by at least one full interval past the due time.
76+
expect(updated?.state.nextRunAtMs).toBeGreaterThanOrEqual(firstDueAt + 10_000);
77+
78+
cron.stop();
79+
await store.cleanup();
80+
});
81+
82+
it("fires a cron-expression job when the timer fires a few ms late", async () => {
83+
const store = await makeStorePath();
84+
const enqueueSystemEvent = vi.fn();
85+
const requestHeartbeatNow = vi.fn();
86+
87+
// Set time to just before a minute boundary.
88+
vi.setSystemTime(new Date("2025-12-13T00:00:59.000Z"));
89+
90+
const cron = new CronService({
91+
storePath: store.storePath,
92+
cronEnabled: true,
93+
log: noopLogger,
94+
enqueueSystemEvent,
95+
requestHeartbeatNow,
96+
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" })),
97+
});
98+
99+
await cron.start();
100+
const job = await cron.add({
101+
name: "every minute check",
102+
enabled: true,
103+
schedule: { kind: "cron", expr: "* * * * *" },
104+
sessionTarget: "main",
105+
wakeMode: "next-heartbeat",
106+
payload: { kind: "systemEvent", text: "cron-tick" },
107+
});
108+
109+
const firstDueAt = job.state.nextRunAtMs!;
110+
111+
// Simulate setTimeout firing 5ms late.
112+
vi.setSystemTime(new Date(firstDueAt + 5));
113+
await vi.runOnlyPendingTimersAsync();
114+
115+
// Wait for the async onTimer to complete via the lock queue.
116+
const jobs = await cron.list();
117+
const updated = jobs.find((j) => j.id === job.id);
118+
119+
expect(enqueueSystemEvent).toHaveBeenCalledWith("cron-tick", { agentId: undefined });
120+
expect(updated?.state.lastStatus).toBe("ok");
121+
// nextRunAtMs should be the next whole-minute boundary (60s later).
122+
expect(updated?.state.nextRunAtMs).toBe(firstDueAt + 60_000);
123+
124+
cron.stop();
125+
await store.cleanup();
126+
});
127+
});

src/cron/service/store.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,15 @@ async function getFileMtimeMs(path: string): Promise<number | null> {
126126
}
127127
}
128128

129-
export async function ensureLoaded(state: CronServiceState, opts?: { forceReload?: boolean }) {
129+
export async function ensureLoaded(
130+
state: CronServiceState,
131+
opts?: {
132+
forceReload?: boolean;
133+
/** Skip recomputing nextRunAtMs after load so the caller can run due
134+
* jobs against the persisted values first (see onTimer). */
135+
skipRecompute?: boolean;
136+
},
137+
) {
130138
// Fast path: store is already in memory. Other callers (add, list, run, …)
131139
// trust the in-memory copy to avoid a stat syscall on every operation.
132140
if (state.store && !opts?.forceReload) {
@@ -255,8 +263,9 @@ export async function ensureLoaded(state: CronServiceState, opts?: { forceReload
255263
state.storeLoadedAtMs = state.deps.nowMs();
256264
state.storeFileMtimeMs = fileMtimeMs;
257265

258-
// Recompute next runs after loading to ensure accuracy
259-
recomputeNextRuns(state);
266+
if (!opts?.skipRecompute) {
267+
recomputeNextRuns(state);
268+
}
260269

261270
if (mutated) {
262271
await persist(state);

src/cron/service/timer.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
22
import type { CronJob } from "../types.js";
33
import type { CronEvent, CronServiceState } from "./state.js";
4-
import { computeJobNextRunAtMs, nextWakeAtMs, resolveJobPayloadTextForMain } from "./jobs.js";
4+
import {
5+
computeJobNextRunAtMs,
6+
nextWakeAtMs,
7+
recomputeNextRuns,
8+
resolveJobPayloadTextForMain,
9+
} from "./jobs.js";
510
import { locked } from "./locked.js";
611
import { ensureLoaded, persist } from "./store.js";
712

@@ -36,8 +41,12 @@ export async function onTimer(state: CronServiceState) {
3641
state.running = true;
3742
try {
3843
await locked(state, async () => {
39-
await ensureLoaded(state, { forceReload: true });
44+
// Reload persisted due-times without recomputing so runDueJobs sees
45+
// the original nextRunAtMs values. Recomputing first would advance
46+
// every/cron slots past the current tick when the timer fires late (#9788).
47+
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
4048
await runDueJobs(state);
49+
recomputeNextRuns(state);
4150
await persist(state);
4251
armTimer(state);
4352
});

0 commit comments

Comments
 (0)