Skip to content

Commit 2b70f44

Browse files
authored
Merge 58f1a7f into 520992a
2 parents 520992a + 58f1a7f commit 2b70f44

9 files changed

Lines changed: 433 additions & 18 deletions

File tree

src/commands/tasks.test.ts

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,20 @@ import * as taskRegistryMaintenance from "../tasks/task-registry.maintenance.js"
1818
import type { TaskRecord } from "../tasks/task-registry.types.js";
1919
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
2020
import type { OpenClawTestState } from "../test-utils/openclaw-test-state.js";
21-
import { tasksAuditCommand, tasksMaintenanceCommand, tasksShowCommand } from "./tasks.js";
21+
import {
22+
tasksAuditCommand,
23+
tasksCancelCommand,
24+
tasksMaintenanceCommand,
25+
tasksShowCommand,
26+
} from "./tasks.js";
27+
28+
const gatewayMocks = vi.hoisted(() => ({
29+
callGateway: vi.fn(),
30+
}));
31+
32+
vi.mock("../gateway/call.js", () => ({
33+
callGateway: gatewayMocks.callGateway,
34+
}));
2235

2336
function createRuntime(): RuntimeEnv {
2437
return {
@@ -96,6 +109,7 @@ describe("tasks commands", () => {
96109
resetTaskRegistryDeliveryRuntimeForTests();
97110
resetTaskRegistryForTests({ persist: false });
98111
resetTaskFlowRegistryForTests({ persist: false });
112+
gatewayMocks.callGateway.mockReset();
99113
});
100114

101115
it("keeps audit JSON stable and sorts combined findings before limiting", async () => {
@@ -169,6 +183,40 @@ describe("tasks commands", () => {
169183
});
170184
});
171185

186+
it("routes cron task cancellation through the live Gateway", async () => {
187+
await withTaskCommandStateDir(async () => {
188+
const task = createTaskRecord({
189+
runtime: "cron",
190+
ownerKey: "",
191+
scopeKind: "system",
192+
childSessionKey: "agent:main:cron:daily:run:1",
193+
runId: "cron:daily:1",
194+
task: "Daily cron",
195+
status: "running",
196+
deliveryStatus: "not_applicable",
197+
});
198+
gatewayMocks.callGateway.mockResolvedValueOnce({
199+
found: true,
200+
cancelled: true,
201+
task: { ...task, status: "cancelled" },
202+
});
203+
204+
const runtime = createRuntime();
205+
await tasksCancelCommand({ lookup: task.taskId }, runtime);
206+
207+
expect(gatewayMocks.callGateway).toHaveBeenCalledWith({
208+
method: "tasks.cancel",
209+
params: {
210+
taskId: task.taskId,
211+
reason: "Cancelled by operator.",
212+
},
213+
timeoutMs: 5_000,
214+
});
215+
expect(runtime.error).not.toHaveBeenCalled();
216+
expect(runtime.log).toHaveBeenCalledWith(`Cancelled ${task.taskId} (cron) run cron:daily:1.`);
217+
});
218+
});
219+
172220
it("explains stale running tasks retained by backing sessions in maintenance JSON", async () => {
173221
await withTaskCommandStateDir(async (state) => {
174222
const now = Date.now();

src/commands/tasks.ts

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
type SessionEntry,
1717
} from "../config/sessions.js";
1818
import { loadCronJobsStoreSync, resolveCronJobsStorePath } from "../cron/store.js";
19+
import { callGateway } from "../gateway/call.js";
1920
import type { RuntimeEnv } from "../runtime.js";
2021
import { parseAgentSessionKey } from "../sessions/session-key-utils.js";
2122
import { getTaskById, updateTaskNotifyPolicyById } from "../tasks/runtime-internal.js";
@@ -78,6 +79,38 @@ async function loadTaskCancelConfig() {
7879
return getRuntimeConfig();
7980
}
8081

82+
type TaskCancelResult = {
83+
found?: boolean;
84+
cancelled?: boolean;
85+
reason?: string;
86+
task?: Pick<TaskRecord, "taskId" | "runtime" | "runId">;
87+
};
88+
89+
async function cancelCronTaskThroughGateway(task: TaskRecord): Promise<TaskCancelResult> {
90+
try {
91+
return await callGateway<TaskCancelResult>({
92+
method: "tasks.cancel",
93+
params: {
94+
taskId: task.taskId,
95+
reason: "Cancelled by operator.",
96+
},
97+
timeoutMs: 5_000,
98+
});
99+
} catch (error) {
100+
const message = error instanceof Error ? error.message : String(error);
101+
return {
102+
found: true,
103+
cancelled: false,
104+
reason: [
105+
"Cron task cancellation requires the live Gateway process that owns the cron run.",
106+
`Gateway cancellation failed: ${message}`,
107+
`Try ${formatCliCommand("openclaw gateway status --deep --require-rpc")} or restart the Gateway if the run is stuck.`,
108+
].join("\n"),
109+
task,
110+
};
111+
}
112+
}
113+
81114
function configureTaskMaintenanceFromConfig(): void {
82115
const cfg = getRuntimeConfig();
83116
configureTaskRegistryMaintenance({
@@ -498,10 +531,13 @@ export async function tasksCancelCommand(opts: { lookup: string }, runtime: Runt
498531
runtime.exit(1);
499532
return;
500533
}
501-
const result = await cancelDetachedTaskRunById({
502-
cfg: await loadTaskCancelConfig(),
503-
taskId: task.taskId,
504-
});
534+
const result =
535+
task.runtime === "cron"
536+
? await cancelCronTaskThroughGateway(task)
537+
: await cancelDetachedTaskRunById({
538+
cfg: await loadTaskCancelConfig(),
539+
taskId: task.taskId,
540+
});
505541
if (!result.found) {
506542
runtime.error(result.reason ?? formatTaskLookupMiss(opts.lookup));
507543
runtime.exit(1);
@@ -512,7 +548,7 @@ export async function tasksCancelCommand(opts: { lookup: string }, runtime: Runt
512548
runtime.exit(1);
513549
return;
514550
}
515-
const updated = getTaskById(task.taskId);
551+
const updated = result.task ?? getTaskById(task.taskId);
516552
runtime.log(
517553
`Cancelled ${updated?.taskId ?? task.taskId} (${updated?.runtime ?? task.runtime})${updated?.runId ? ` run ${updated.runId}` : ""}.`,
518554
);

src/cron/service/ops.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { normalizeLowercaseStringOrEmpty } from "@openclaw/normalization-core/st
33
import { enqueueCommandInLane } from "../../process/command-queue.js";
44
import { CommandLane } from "../../process/lanes.js";
55
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
6+
import { registerActiveCronTaskRun } from "../../tasks/cron-task-cancel.js";
67
import {
78
completeTaskRunByRunId,
89
createRunningTaskRun,
@@ -808,11 +809,16 @@ async function finishPreparedManualRun(
808809
const jobId = prepared.jobId;
809810
const taskRunId = prepared.taskRunId;
810811
const runId = prepared.runId;
812+
const runAbortController = new AbortController();
813+
const releaseCronTaskRun = registerActiveCronTaskRun({
814+
runId: executionJob.sessionTarget === "main" ? undefined : taskRunId,
815+
controller: runAbortController,
816+
});
811817

812818
try {
813819
let coreResult: Awaited<ReturnType<typeof executeJobCoreWithTimeout>>;
814820
try {
815-
coreResult = await executeJobCoreWithTimeout(state, executionJob);
821+
coreResult = await executeJobCoreWithTimeout(state, executionJob, runAbortController);
816822
} catch (err) {
817823
coreResult = { status: "error", error: normalizeCronRunErrorText(err) };
818824
}
@@ -899,6 +905,7 @@ async function finishPreparedManualRun(
899905
armTimer(state);
900906
});
901907
} finally {
908+
releaseCronTaskRun?.();
902909
clearCronJobActive(jobId);
903910
}
904911
}

src/cron/service/timer.test.ts

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import path from "node:path";
44
import { afterEach, describe, expect, it, vi } from "vitest";
55
import { setupCronServiceSuite, writeCronStoreSnapshot } from "../../cron/service.test-harness.js";
66
import { createCronServiceState } from "../../cron/service/state.js";
7-
import { executeJobCore, onTimer } from "../../cron/service/timer.js";
7+
import { executeJobCore, executeJobCoreWithTimeout, onTimer } from "../../cron/service/timer.js";
88
import { loadCronStore } from "../../cron/store.js";
99
import type { CronJob } from "../../cron/types.js";
10+
import { resetActiveCronTaskRunsForTests } from "../../tasks/cron-task-cancel.js";
1011
import * as detachedTaskRuntime from "../../tasks/detached-task-runtime.js";
12+
import { cancelDetachedTaskRunById } from "../../tasks/task-executor.js";
1113
import { findTaskByRunId, resetTaskRegistryForTests } from "../../tasks/task-registry.js";
1214
import { formatTaskStatusDetail } from "../../tasks/task-status.js";
1315

@@ -48,6 +50,7 @@ function createDueIsolatedAgentJob(params: { now: number }): CronJob {
4850
}
4951

5052
afterEach(() => {
53+
resetActiveCronTaskRunsForTests();
5154
resetTaskRegistryForTests();
5255
});
5356

@@ -272,6 +275,150 @@ describe("cron service timer seam coverage", () => {
272275
await timerRun;
273276
});
274277

278+
it("cancels an active scheduled isolated cron task through the task ledger", async () => {
279+
const { storePath } = await makeStorePath();
280+
const now = Date.parse("2026-03-23T12:00:00.000Z");
281+
const enqueueSystemEvent = vi.fn();
282+
const requestHeartbeat = vi.fn();
283+
let capturedAbortSignal: AbortSignal | undefined;
284+
let resolveRun: ((value: { status: "ok"; summary: string }) => void) | undefined;
285+
const runIsolatedAgentJob = vi.fn(
286+
({ abortSignal }: { abortSignal?: AbortSignal }) =>
287+
new Promise<{ status: "ok"; summary: string }>((resolve) => {
288+
capturedAbortSignal = abortSignal;
289+
resolveRun = resolve;
290+
}),
291+
);
292+
293+
await writeCronStoreSnapshot({
294+
storePath,
295+
jobs: [createDueIsolatedAgentJob({ now })],
296+
});
297+
298+
const state = createCronServiceState({
299+
storePath,
300+
cronEnabled: true,
301+
log: logger,
302+
nowMs: () => now,
303+
enqueueSystemEvent,
304+
requestHeartbeat,
305+
runIsolatedAgentJob,
306+
});
307+
308+
const timerRun = onTimer(state);
309+
await vi.waitFor(() => {
310+
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
311+
});
312+
313+
const task = findTaskByRunId(`cron:isolated-agent-job:${now}`);
314+
if (!task) {
315+
throw new Error("expected active cron task ledger record");
316+
}
317+
318+
const cancelled = await cancelDetachedTaskRunById({
319+
cfg: {} as never,
320+
taskId: task.taskId,
321+
});
322+
323+
expect(cancelled.cancelled).toBe(true);
324+
expect(capturedAbortSignal?.aborted).toBe(true);
325+
expect(findTaskByRunId(`cron:isolated-agent-job:${now}`)?.status).toBe("cancelled");
326+
327+
resolveRun?.({ status: "ok", summary: "done" });
328+
await timerRun;
329+
expect(findTaskByRunId(`cron:isolated-agent-job:${now}`)?.status).toBe("cancelled");
330+
});
331+
332+
it("keeps timeout watchdog as a backstop after operator cancellation", async () => {
333+
vi.useFakeTimers();
334+
const { storePath } = await makeStorePath();
335+
const now = Date.parse("2026-03-23T12:00:00.000Z");
336+
const controller = new AbortController();
337+
const runIsolatedAgentJob = vi.fn(({ onExecutionStarted }) => {
338+
onExecutionStarted?.({ phase: "running" });
339+
return new Promise<never>(() => {});
340+
});
341+
const state = createCronServiceState({
342+
storePath,
343+
cronEnabled: true,
344+
log: logger,
345+
nowMs: () => now,
346+
enqueueSystemEvent: vi.fn(),
347+
requestHeartbeat: vi.fn(),
348+
runIsolatedAgentJob,
349+
cleanupTimedOutAgentRun: vi.fn(async () => {}),
350+
});
351+
const job: CronJob = {
352+
...createDueIsolatedAgentJob({ now }),
353+
payload: { kind: "agentTurn", message: "ignore abort", timeoutSeconds: 1 },
354+
};
355+
356+
const resultPromise = executeJobCoreWithTimeout(state, job, controller);
357+
await vi.waitFor(() => {
358+
expect(runIsolatedAgentJob).toHaveBeenCalledTimes(1);
359+
});
360+
controller.abort("Cancelled by operator.");
361+
await vi.advanceTimersByTimeAsync(1_000);
362+
363+
await expect(resultPromise).resolves.toMatchObject({
364+
status: "error",
365+
error: expect.stringContaining("timed out"),
366+
});
367+
expect(state.deps.cleanupTimedOutAgentRun).toHaveBeenCalled();
368+
});
369+
370+
it("does not report main-session cron tasks as cancelled after enqueue", async () => {
371+
const { storePath } = await makeStorePath();
372+
const now = Date.parse("2026-03-23T12:00:00.000Z");
373+
const enqueueSystemEvent = vi.fn();
374+
const requestHeartbeat = vi.fn();
375+
let resolveHeartbeat: ((value: { status: "ran"; durationMs: number }) => void) | undefined;
376+
const runHeartbeatOnce = vi.fn(
377+
() =>
378+
new Promise<{ status: "ran"; durationMs: number }>((resolve) => {
379+
resolveHeartbeat = resolve;
380+
}),
381+
);
382+
383+
await writeCronStoreSnapshot({
384+
storePath,
385+
jobs: [createDueMainJob({ now, wakeMode: "now" })],
386+
});
387+
388+
const state = createCronServiceState({
389+
storePath,
390+
cronEnabled: true,
391+
log: logger,
392+
nowMs: () => now,
393+
enqueueSystemEvent,
394+
requestHeartbeat,
395+
runHeartbeatOnce,
396+
runIsolatedAgentJob: vi.fn(async () => ({ status: "ok" as const })),
397+
});
398+
399+
const timerRun = onTimer(state);
400+
await vi.waitFor(() => {
401+
expect(runHeartbeatOnce).toHaveBeenCalledTimes(1);
402+
});
403+
404+
const task = findTaskByRunId(`cron:main-heartbeat-job:${now}`);
405+
if (!task) {
406+
throw new Error("expected active main-session cron task ledger record");
407+
}
408+
409+
const cancelled = await cancelDetachedTaskRunById({
410+
cfg: {} as never,
411+
taskId: task.taskId,
412+
});
413+
414+
expect(cancelled.cancelled).toBe(false);
415+
expect(cancelled.reason).toContain("main-session cron jobs enqueue work");
416+
expect(findTaskByRunId(`cron:main-heartbeat-job:${now}`)?.status).toBe("running");
417+
418+
resolveHeartbeat?.({ status: "ran", durationMs: 1 });
419+
await timerRun;
420+
});
421+
275422
it("keeps scheduler progress when task ledger creation fails", async () => {
276423
const { storePath } = await makeStorePath();
277424
const now = Date.parse("2026-03-23T12:00:00.000Z");

0 commit comments

Comments
 (0)