Skip to content

Commit 5d90e31

Browse files
committed
refactor(cron): share timed job-execution helper
1 parent dff9ead commit 5d90e31

3 files changed

Lines changed: 77 additions & 103 deletions

File tree

src/cron/service.issue-regressions.test.ts

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,29 @@ function createDefaultIsolatedRunner(): CronServiceOptions["runIsolatedAgentJob"
6767
}) as CronServiceOptions["runIsolatedAgentJob"];
6868
}
6969

70+
function createAbortAwareIsolatedRunner(summary = "late") {
71+
let observedAbortSignal: AbortSignal | undefined;
72+
const runIsolatedAgentJob = vi.fn(async ({ abortSignal }) => {
73+
observedAbortSignal = abortSignal;
74+
await new Promise<void>((resolve) => {
75+
if (!abortSignal) {
76+
return;
77+
}
78+
if (abortSignal.aborted) {
79+
resolve();
80+
return;
81+
}
82+
abortSignal.addEventListener("abort", () => resolve(), { once: true });
83+
});
84+
return { status: "ok" as const, summary };
85+
}) as CronServiceOptions["runIsolatedAgentJob"];
86+
87+
return {
88+
runIsolatedAgentJob,
89+
getObservedAbortSignal: () => observedAbortSignal,
90+
};
91+
}
92+
7093
function createIsolatedRegressionJob(params: {
7194
id: string;
7295
name: string;
@@ -684,35 +707,25 @@ describe("Cron issue regressions", () => {
684707
await writeCronJobs(store.storePath, [cronJob]);
685708

686709
let now = scheduledAt;
687-
let observedAbortSignal: AbortSignal | undefined;
710+
const abortAwareRunner = createAbortAwareIsolatedRunner();
688711
const state = createCronServiceState({
689712
cronEnabled: true,
690713
storePath: store.storePath,
691714
log: noopLogger,
692715
nowMs: () => now,
693716
enqueueSystemEvent: vi.fn(),
694717
requestHeartbeatNow: vi.fn(),
695-
runIsolatedAgentJob: vi.fn(async ({ abortSignal }) => {
696-
observedAbortSignal = abortSignal;
697-
await new Promise<void>((resolve) => {
698-
if (!abortSignal) {
699-
return;
700-
}
701-
if (abortSignal.aborted) {
702-
resolve();
703-
return;
704-
}
705-
abortSignal.addEventListener("abort", () => resolve(), { once: true });
706-
});
718+
runIsolatedAgentJob: vi.fn(async (params) => {
719+
const result = await abortAwareRunner.runIsolatedAgentJob(params);
707720
now += 5;
708-
return { status: "ok" as const, summary: "late" };
721+
return result;
709722
}),
710723
});
711724

712725
await onTimer(state);
713726

714-
expect(observedAbortSignal).toBeDefined();
715-
expect(observedAbortSignal?.aborted).toBe(true);
727+
expect(abortAwareRunner.getObservedAbortSignal()).toBeDefined();
728+
expect(abortAwareRunner.getObservedAbortSignal()?.aborted).toBe(true);
716729
const job = state.store?.jobs.find((entry) => entry.id === "abort-on-timeout");
717730
expect(job?.state.lastStatus).toBe("error");
718731
expect(job?.state.lastError).toContain("timed out");
@@ -721,24 +734,11 @@ describe("Cron issue regressions", () => {
721734
it("applies timeoutSeconds to manual cron.run isolated executions", async () => {
722735
vi.useRealTimers();
723736
const store = await makeStorePath();
724-
let observedAbortSignal: AbortSignal | undefined;
737+
const abortAwareRunner = createAbortAwareIsolatedRunner();
725738

726739
const cron = await startCronForStore({
727740
storePath: store.storePath,
728-
runIsolatedAgentJob: vi.fn(async ({ abortSignal }) => {
729-
observedAbortSignal = abortSignal;
730-
await new Promise<void>((resolve) => {
731-
if (!abortSignal) {
732-
return;
733-
}
734-
if (abortSignal.aborted) {
735-
resolve();
736-
return;
737-
}
738-
abortSignal.addEventListener("abort", () => resolve(), { once: true });
739-
});
740-
return { status: "ok" as const, summary: "late" };
741-
}),
741+
runIsolatedAgentJob: abortAwareRunner.runIsolatedAgentJob,
742742
});
743743

744744
const job = await cron.add({
@@ -753,8 +753,8 @@ describe("Cron issue regressions", () => {
753753

754754
const result = await cron.run(job.id, "force");
755755
expect(result).toEqual({ ok: true, ran: true });
756-
expect(observedAbortSignal).toBeDefined();
757-
expect(observedAbortSignal?.aborted).toBe(true);
756+
expect(abortAwareRunner.getObservedAbortSignal()).toBeDefined();
757+
expect(abortAwareRunner.getObservedAbortSignal()?.aborted).toBe(true);
758758

759759
const updated = (await cron.list({ includeDisabled: true })).find(
760760
(entry) => entry.id === job.id,

src/cron/service/ops.ts

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@ import { locked } from "./locked.js";
1313
import type { CronServiceState } from "./state.js";
1414
import { ensureLoaded, persist, warnIfDisabled } from "./store.js";
1515
import {
16-
DEFAULT_JOB_TIMEOUT_MS,
1716
applyJobResult,
1817
armTimer,
1918
emit,
20-
executeJobCore,
19+
executeJobCoreWithTimeout,
2120
runMissedJobs,
2221
stopTimer,
2322
wake,
@@ -248,41 +247,9 @@ export async function run(state: CronServiceState, id: string, mode?: "due" | "f
248247
const startedAt = prepared.startedAt;
249248
const jobId = prepared.jobId;
250249

251-
let coreResult: Awaited<ReturnType<typeof executeJobCore>>;
252-
const configuredTimeoutMs =
253-
executionJob.payload.kind === "agentTurn" &&
254-
typeof executionJob.payload.timeoutSeconds === "number"
255-
? Math.floor(executionJob.payload.timeoutSeconds * 1_000)
256-
: undefined;
257-
const jobTimeoutMs =
258-
configuredTimeoutMs !== undefined
259-
? configuredTimeoutMs <= 0
260-
? undefined
261-
: configuredTimeoutMs
262-
: DEFAULT_JOB_TIMEOUT_MS;
250+
let coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
263251
try {
264-
const runAbortController = typeof jobTimeoutMs === "number" ? new AbortController() : undefined;
265-
coreResult =
266-
typeof jobTimeoutMs === "number"
267-
? await (async () => {
268-
let timeoutId: NodeJS.Timeout | undefined;
269-
try {
270-
return await Promise.race([
271-
executeJobCore(state, executionJob, runAbortController?.signal),
272-
new Promise<never>((_, reject) => {
273-
timeoutId = setTimeout(() => {
274-
runAbortController?.abort(new Error("cron: job execution timed out"));
275-
reject(new Error("cron: job execution timed out"));
276-
}, jobTimeoutMs);
277-
}),
278-
]);
279-
} finally {
280-
if (timeoutId) {
281-
clearTimeout(timeoutId);
282-
}
283-
}
284-
})()
285-
: await executeJobCore(state, executionJob);
252+
coreResult = await executeJobCoreWithTimeout(state, executionJob);
286253
} catch (err) {
287254
coreResult = { status: "error", error: String(err) };
288255
}

src/cron/service/timer.ts

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,45 @@ type TimedCronRunOutcome = CronRunOutcome &
4545
endedAt: number;
4646
};
4747

48+
function resolveCronJobTimeoutMs(job: CronJob): number | undefined {
49+
const configuredTimeoutMs =
50+
job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number"
51+
? Math.floor(job.payload.timeoutSeconds * 1_000)
52+
: undefined;
53+
if (configuredTimeoutMs === undefined) {
54+
return DEFAULT_JOB_TIMEOUT_MS;
55+
}
56+
return configuredTimeoutMs <= 0 ? undefined : configuredTimeoutMs;
57+
}
58+
59+
export async function executeJobCoreWithTimeout(
60+
state: CronServiceState,
61+
job: CronJob,
62+
): Promise<Awaited<ReturnType<typeof executeJobCore>>> {
63+
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
64+
if (typeof jobTimeoutMs !== "number") {
65+
return await executeJobCore(state, job);
66+
}
67+
68+
const runAbortController = new AbortController();
69+
let timeoutId: NodeJS.Timeout | undefined;
70+
try {
71+
return await Promise.race([
72+
executeJobCore(state, job, runAbortController.signal),
73+
new Promise<never>((_, reject) => {
74+
timeoutId = setTimeout(() => {
75+
runAbortController.abort(new Error("cron: job execution timed out"));
76+
reject(new Error("cron: job execution timed out"));
77+
}, jobTimeoutMs);
78+
}),
79+
]);
80+
} finally {
81+
if (timeoutId) {
82+
clearTimeout(timeoutId);
83+
}
84+
}
85+
}
86+
4887
function resolveRunConcurrency(state: CronServiceState): number {
4988
const raw = state.deps.cronConfig?.maxConcurrentRuns;
5089
if (typeof raw !== "number" || !Number.isFinite(raw)) {
@@ -309,42 +348,10 @@ export async function onTimer(state: CronServiceState) {
309348
const startedAt = state.deps.nowMs();
310349
job.state.runningAtMs = startedAt;
311350
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
312-
313-
const configuredTimeoutMs =
314-
job.payload.kind === "agentTurn" && typeof job.payload.timeoutSeconds === "number"
315-
? Math.floor(job.payload.timeoutSeconds * 1_000)
316-
: undefined;
317-
const jobTimeoutMs =
318-
configuredTimeoutMs !== undefined
319-
? configuredTimeoutMs <= 0
320-
? undefined
321-
: configuredTimeoutMs
322-
: DEFAULT_JOB_TIMEOUT_MS;
351+
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
323352

324353
try {
325-
const runAbortController =
326-
typeof jobTimeoutMs === "number" ? new AbortController() : undefined;
327-
const result =
328-
typeof jobTimeoutMs === "number"
329-
? await (async () => {
330-
let timeoutId: NodeJS.Timeout | undefined;
331-
try {
332-
return await Promise.race([
333-
executeJobCore(state, job, runAbortController?.signal),
334-
new Promise<never>((_, reject) => {
335-
timeoutId = setTimeout(() => {
336-
runAbortController?.abort(new Error("cron: job execution timed out"));
337-
reject(new Error("cron: job execution timed out"));
338-
}, jobTimeoutMs);
339-
}),
340-
]);
341-
} finally {
342-
if (timeoutId) {
343-
clearTimeout(timeoutId);
344-
}
345-
}
346-
})()
347-
: await executeJobCore(state, job);
354+
const result = await executeJobCoreWithTimeout(state, job);
348355
return { jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() };
349356
} catch (err) {
350357
state.deps.log.warn(

0 commit comments

Comments
 (0)