Skip to content

Commit 1c90538

Browse files
authored
refactor(cron): split main and detached dispatch (#57482)
* refactor(tasks): add executor facade * refactor(tasks): extract delivery policy * refactor(tasks): route acp through executor * refactor(tasks): route subagents through executor * refactor(cron): split main and detached dispatch
1 parent 4be290c commit 1c90538

4 files changed

Lines changed: 166 additions & 124 deletions

File tree

src/cron/service/ops.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import fs from "node:fs/promises";
22
import path from "node:path";
33
import { describe, expect, it, vi } from "vitest";
4-
import * as taskRegistry from "../../tasks/task-registry.js";
4+
import * as taskExecutor from "../../tasks/task-executor.js";
55
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
66
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../service.test-harness.js";
77
import type { CronJob } from "../types.js";
@@ -166,7 +166,7 @@ describe("cron service ops seam coverage", () => {
166166
});
167167

168168
const createTaskRecordSpy = vi
169-
.spyOn(taskRegistry, "createTaskRecord")
169+
.spyOn(taskExecutor, "createRunningTaskRun")
170170
.mockImplementation(() => {
171171
throw new Error("disk full");
172172
});
@@ -210,7 +210,7 @@ describe("cron service ops seam coverage", () => {
210210
});
211211

212212
const updateTaskRecordSpy = vi
213-
.spyOn(taskRegistry, "markTaskTerminalById")
213+
.spyOn(taskExecutor, "completeTaskRunByRunId")
214214
.mockImplementation(() => {
215215
throw new Error("disk full");
216216
});

src/cron/service/ops.ts

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { enqueueCommandInLane } from "../../process/command-queue.js";
22
import { CommandLane } from "../../process/lanes.js";
3-
import { createTaskRecord, markTaskTerminalById } from "../../tasks/task-registry.js";
3+
import {
4+
completeTaskRunByRunId,
5+
createRunningTaskRun,
6+
failTaskRunByRunId,
7+
} from "../../tasks/task-executor.js";
48
import type { CronJob, CronJobCreate, CronJobPatch } from "../types.js";
59
import { normalizeCronCreateDeliveryInput } from "./initial-delivery.js";
610
import {
@@ -360,7 +364,7 @@ type PreparedManualRun =
360364
ok: true;
361365
ran: true;
362366
jobId: string;
363-
taskId?: string;
367+
taskRunId?: string;
364368
startedAt: number;
365369
executionJob: CronJob;
366370
}
@@ -382,27 +386,32 @@ type ManualRunPreflightResult =
382386

383387
let nextManualRunId = 1;
384388

385-
function tryCreateManualTaskRecord(params: {
389+
function createCronTaskRunId(jobId: string, startedAt: number): string {
390+
return `cron:${jobId}:${startedAt}`;
391+
}
392+
393+
function tryCreateManualTaskRun(params: {
386394
state: CronServiceState;
387395
job: CronJob;
388396
startedAt: number;
389397
}): string | undefined {
398+
const runId = createCronTaskRunId(params.job.id, params.startedAt);
390399
try {
391-
return createTaskRecord({
400+
createRunningTaskRun({
392401
runtime: "cron",
393402
sourceId: params.job.id,
394403
requesterSessionKey: "",
395404
childSessionKey: params.job.sessionKey,
396405
agentId: params.job.agentId,
397-
runId: `cron:${params.job.id}:${params.startedAt}`,
406+
runId,
398407
label: params.job.name,
399408
task: params.job.name || params.job.id,
400-
status: "running",
401409
deliveryStatus: "not_applicable",
402410
notifyPolicy: "silent",
403411
startedAt: params.startedAt,
404412
lastEventAt: params.startedAt,
405-
}).taskId;
413+
});
414+
return runId;
406415
} catch (error) {
407416
params.state.deps.log.warn(
408417
{ jobId: params.job.id, error },
@@ -412,26 +421,33 @@ function tryCreateManualTaskRecord(params: {
412421
}
413422
}
414423

415-
function tryUpdateManualTaskRecord(
424+
function tryFinishManualTaskRun(
416425
state: CronServiceState,
417426
params: {
418-
taskId?: string;
427+
taskRunId?: string;
419428
coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
420429
endedAt: number;
421430
},
422431
): void {
423-
if (!params.taskId) {
432+
if (!params.taskRunId) {
424433
return;
425434
}
426435
try {
427-
markTaskTerminalById({
428-
taskId: params.taskId,
436+
if (params.coreResult.status === "ok" || params.coreResult.status === "skipped") {
437+
completeTaskRunByRunId({
438+
runId: params.taskRunId,
439+
endedAt: params.endedAt,
440+
lastEventAt: params.endedAt,
441+
terminalSummary: params.coreResult.summary ?? undefined,
442+
});
443+
return;
444+
}
445+
failTaskRunByRunId({
446+
runId: params.taskRunId,
429447
status:
430-
params.coreResult.status === "ok" || params.coreResult.status === "skipped"
431-
? "succeeded"
432-
: normalizeCronRunErrorText(params.coreResult.error) === "cron: job execution timed out"
433-
? "timed_out"
434-
: "failed",
448+
normalizeCronRunErrorText(params.coreResult.error) === "cron: job execution timed out"
449+
? "timed_out"
450+
: "failed",
435451
endedAt: params.endedAt,
436452
lastEventAt: params.endedAt,
437453
error:
@@ -442,7 +458,7 @@ function tryUpdateManualTaskRecord(
442458
});
443459
} catch (error) {
444460
state.deps.log.warn(
445-
{ taskId: params.taskId, jobStatus: params.coreResult.status, error },
461+
{ runId: params.taskRunId, jobStatus: params.coreResult.status, error },
446462
"cron: failed to update task ledger record",
447463
);
448464
}
@@ -517,7 +533,7 @@ async function prepareManualRun(
517533
// force-reload from disk cannot start the same job concurrently.
518534
await persist(state);
519535
emit(state, { jobId: job.id, action: "started", runAtMs: preflight.now });
520-
const taskId = tryCreateManualTaskRecord({
536+
const taskRunId = tryCreateManualTaskRun({
521537
state,
522538
job,
523539
startedAt: preflight.now,
@@ -527,7 +543,7 @@ async function prepareManualRun(
527543
ok: true,
528544
ran: true,
529545
jobId: job.id,
530-
taskId,
546+
taskRunId,
531547
startedAt: preflight.now,
532548
executionJob,
533549
} as const;
@@ -542,7 +558,7 @@ async function finishPreparedManualRun(
542558
const executionJob = prepared.executionJob;
543559
const startedAt = prepared.startedAt;
544560
const jobId = prepared.jobId;
545-
const taskId = prepared.taskId;
561+
const taskRunId = prepared.taskRunId;
546562

547563
let coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
548564
try {
@@ -551,8 +567,8 @@ async function finishPreparedManualRun(
551567
coreResult = { status: "error", error: normalizeCronRunErrorText(err) };
552568
}
553569
const endedAt = state.deps.nowMs();
554-
tryUpdateManualTaskRecord(state, {
555-
taskId,
570+
tryFinishManualTaskRun(state, {
571+
taskRunId,
556572
coreResult,
557573
endedAt,
558574
});

src/cron/service/timer.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/servic
44
import { createCronServiceState } from "../../cron/service/state.js";
55
import { onTimer } from "../../cron/service/timer.js";
66
import type { CronJob } from "../../cron/types.js";
7-
import * as taskRegistry from "../../tasks/task-registry.js";
7+
import * as taskExecutor from "../../tasks/task-executor.js";
88
import { resetTaskRegistryForTests } from "../../tasks/task-registry.js";
99

1010
const { logger, makeStorePath } = setupCronServiceSuite({
@@ -96,7 +96,7 @@ describe("cron service timer seam coverage", () => {
9696
});
9797

9898
const createTaskRecordSpy = vi
99-
.spyOn(taskRegistry, "createTaskRecord")
99+
.spyOn(taskExecutor, "createRunningTaskRun")
100100
.mockImplementation(() => {
101101
throw new Error("disk full");
102102
});

0 commit comments

Comments
 (0)