Skip to content

Commit 7d1575b

Browse files
committed
fix: reconcile stale cron and chat-backed tasks (#60310) (thanks @lml2468)
1 parent 7036e5a commit 7d1575b

6 files changed

Lines changed: 110 additions & 35 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ Docs: https://docs.openclaw.ai
6060
- Update/npm: prefer the npm binary that owns the installed global OpenClaw prefix so mixed Homebrew-plus-nvm setups update the right install. (#60153) Thanks @jayeshp19.
6161
- Gateway/plugin routes: keep gateway-auth plugin runtime routes on write-only fallback scopes unless a trusted-proxy caller explicitly declares narrower `x-openclaw-scopes`, so plugin HTTP handlers no longer mint admin-level runtime scopes on missing or untrusted HTTP scope headers. (#59815) Thanks @pgondhi987.
6262
- Agents/exec approvals: let `exec-approvals.json` agent security override stricter gateway tool defaults so approved subagents can use `security: "full"` without falling back to allowlist enforcement again. (#60310) Thanks @lml2468.
63+
- Tasks/maintenance: reconcile stale cron and chat-backed CLI task rows against live cron-job and agent-run ownership instead of treating any persisted session key as proof that the task is still running. (#60310) Thanks @lml2468.
6364

6465
## 2026.4.2
6566

src/cron/active-jobs.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { resolveGlobalSingleton } from "../shared/global-singleton.js";
2+
3+
type CronActiveJobState = {
4+
activeJobIds: Set<string>;
5+
};
6+
7+
const CRON_ACTIVE_JOB_STATE_KEY = Symbol.for("openclaw.cron.activeJobs");
8+
9+
function getCronActiveJobState(): CronActiveJobState {
10+
return resolveGlobalSingleton<CronActiveJobState>(CRON_ACTIVE_JOB_STATE_KEY, () => ({
11+
activeJobIds: new Set<string>(),
12+
}));
13+
}
14+
15+
export function markCronJobActive(jobId: string) {
16+
if (!jobId) {
17+
return;
18+
}
19+
getCronActiveJobState().activeJobIds.add(jobId);
20+
}
21+
22+
export function clearCronJobActive(jobId: string) {
23+
if (!jobId) {
24+
return;
25+
}
26+
getCronActiveJobState().activeJobIds.delete(jobId);
27+
}
28+
29+
export function isCronJobActive(jobId: string) {
30+
if (!jobId) {
31+
return false;
32+
}
33+
return getCronActiveJobState().activeJobIds.has(jobId);
34+
}
35+
36+
export function resetCronActiveJobsForTests() {
37+
getCronActiveJobState().activeJobIds.clear();
38+
}

src/cron/service/timer.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
createRunningTaskRun,
88
failTaskRunByRunId,
99
} from "../../tasks/task-executor.js";
10+
import { clearCronJobActive, markCronJobActive } from "../active-jobs.js";
1011
import { resolveCronDeliveryPlan } from "../delivery-plan.js";
1112
import { sweepCronRunSessions } from "../session-reaper.js";
1213
import type {
@@ -561,6 +562,7 @@ export function applyJobResult(
561562
}
562563

563564
function applyOutcomeToStoredJob(state: CronServiceState, result: TimedCronRunOutcome): void {
565+
clearCronJobActive(result.jobId);
564566
tryFinishCronTaskRun(state, result);
565567
const store = state.store;
566568
if (!store) {
@@ -716,6 +718,7 @@ export async function onTimer(state: CronServiceState) {
716718
const { id, job } = params;
717719
const startedAt = state.deps.nowMs();
718720
job.state.runningAtMs = startedAt;
721+
markCronJobActive(job.id);
719722
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
720723
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
721724
const taskRunId = tryCreateCronTaskRun({ state, job, startedAt });
@@ -1299,6 +1302,7 @@ export async function executeJob(
12991302
const startedAt = state.deps.nowMs();
13001303
job.state.runningAtMs = startedAt;
13011304
job.state.lastError = undefined;
1305+
markCronJobActive(job.id);
13021306
emit(state, { jobId: job.id, action: "started", runAtMs: startedAt });
13031307

13041308
let coreResult: {
@@ -1327,6 +1331,7 @@ export async function executeJob(
13271331
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
13281332
emit(state, { jobId: job.id, action: "removed" });
13291333
}
1334+
clearCronJobActive(job.id);
13301335
}
13311336

13321337
function emitJobFinished(

src/tasks/task-registry.maintenance.issue-60299.test.ts

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,15 @@ async function loadMaintenanceModule(params: {
2626
tasks: TaskRecord[];
2727
sessionStore?: Record<string, unknown>;
2828
acpEntry?: unknown;
29+
activeCronJobIds?: string[];
30+
activeRunIds?: string[];
2931
}) {
3032
vi.resetModules();
3133

3234
const sessionStore = params.sessionStore ?? {};
3335
const acpEntry = params.acpEntry;
36+
const activeCronJobIds = new Set(params.activeCronJobIds ?? []);
37+
const activeRunIds = new Set(params.activeRunIds ?? []);
3438
const currentTasks = new Map(params.tasks.map((task) => [task.taskId, { ...task }]));
3539

3640
vi.doMock("../acp/runtime/session-meta.js", () => ({
@@ -45,6 +49,15 @@ async function loadMaintenanceModule(params: {
4549
resolveStorePath: () => "",
4650
}));
4751

52+
vi.doMock("../cron/active-jobs.js", () => ({
53+
isCronJobActive: (jobId: string) => activeCronJobIds.has(jobId),
54+
}));
55+
56+
vi.doMock("../infra/agent-events.js", () => ({
57+
getAgentRunContext: (runId: string) =>
58+
activeRunIds.has(runId) ? { sessionKey: "main" } : undefined,
59+
}));
60+
4861
vi.doMock("./runtime-internal.js", () => ({
4962
deleteTaskRecordById: (taskId: string) => currentTasks.delete(taskId),
5063
ensureTaskRegistryReady: () => {},
@@ -90,38 +103,45 @@ async function loadMaintenanceModule(params: {
90103
}
91104

92105
describe("task-registry maintenance issue #60299", () => {
93-
it("marks cron tasks with no child session key lost after the grace period", async () => {
106+
it("marks stale cron tasks lost once the runtime no longer tracks the job as active", async () => {
107+
const childSessionKey = "agent:main:slack:channel:test-channel";
94108
const task = makeStaleTask({
95109
runtime: "cron",
96-
childSessionKey: undefined,
110+
sourceId: "cron-job-1",
111+
childSessionKey,
97112
});
98113

99-
const { mod, currentTasks } = await loadMaintenanceModule({ tasks: [task] });
114+
const { mod, currentTasks } = await loadMaintenanceModule({
115+
tasks: [task],
116+
sessionStore: { [childSessionKey]: { updatedAt: Date.now() } },
117+
});
100118

101119
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 });
102120
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" });
103121
});
104122

105-
it("marks cron tasks lost even if their transient child key still exists in the session store", async () => {
106-
const childSessionKey = "agent:main:slack:channel:test-channel";
123+
it("keeps active cron tasks live while the cron runtime still owns the job", async () => {
107124
const task = makeStaleTask({
108125
runtime: "cron",
109-
childSessionKey,
126+
sourceId: "cron-job-2",
127+
childSessionKey: undefined,
110128
});
111129

112130
const { mod, currentTasks } = await loadMaintenanceModule({
113131
tasks: [task],
114-
sessionStore: { [childSessionKey]: { updatedAt: Date.now() } },
132+
activeCronJobIds: ["cron-job-2"],
115133
});
116134

117-
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 1 });
118-
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" });
135+
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });
136+
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "running" });
119137
});
120138

121-
it("treats cli tasks backed only by a persistent chat session as stale", async () => {
139+
it("marks chat-backed cli tasks lost after the owning run context disappears", async () => {
122140
const channelKey = "agent:main:slack:channel:C1234567890";
123141
const task = makeStaleTask({
124142
runtime: "cli",
143+
sourceId: "run-chat-cli-stale",
144+
runId: "run-chat-cli-stale",
125145
ownerKey: "agent:main:main",
126146
requesterSessionKey: channelKey,
127147
childSessionKey: channelKey,
@@ -136,18 +156,21 @@ describe("task-registry maintenance issue #60299", () => {
136156
expect(currentTasks.get(task.taskId)).toMatchObject({ status: "lost" });
137157
});
138158

139-
it("keeps subagent tasks live while their child session still exists", async () => {
140-
const childKey = "agent:main:subagent:abc123";
159+
it("keeps chat-backed cli tasks live while the owning run context is still active", async () => {
160+
const channelKey = "agent:main:slack:channel:C1234567890";
141161
const task = makeStaleTask({
142-
runtime: "subagent",
162+
runtime: "cli",
163+
sourceId: "run-chat-cli-live",
164+
runId: "run-chat-cli-live",
143165
ownerKey: "agent:main:main",
144-
requesterSessionKey: "agent:main:main",
145-
childSessionKey: childKey,
166+
requesterSessionKey: channelKey,
167+
childSessionKey: channelKey,
146168
});
147169

148170
const { mod, currentTasks } = await loadMaintenanceModule({
149171
tasks: [task],
150-
sessionStore: { [childKey]: { updatedAt: Date.now() } },
172+
sessionStore: { [channelKey]: { updatedAt: Date.now() } },
173+
activeRunIds: ["run-chat-cli-live"],
151174
});
152175

153176
expect(await mod.runTaskRegistryMaintenance()).toMatchObject({ reconciled: 0 });

src/tasks/task-registry.maintenance.ts

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { readAcpSessionEntry } from "../acp/runtime/session-meta.js";
22
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
3+
import { isCronJobActive } from "../cron/active-jobs.js";
4+
import { getAgentRunContext } from "../infra/agent-events.js";
35
import { parseAgentSessionKey } from "../routing/session-key.js";
46
import { deriveSessionChatType } from "../sessions/session-chat-type.js";
57
import {
@@ -64,16 +66,25 @@ function hasLostGraceExpired(task: TaskRecord, now: number): boolean {
6466
return now - referenceAt >= TASK_RECONCILE_GRACE_MS;
6567
}
6668

67-
/**
68-
* Returns false if the task's runtime is cron, since cron tasks do not maintain
69-
* a persistent child session after the job exits.
70-
*
71-
* For cli tasks, long-lived channel/group/direct session-store entries do not
72-
* imply task liveness, so only agent-scoped non-chat child sessions count.
73-
*/
69+
function hasActiveCliRun(task: TaskRecord): boolean {
70+
const candidateRunIds = [task.sourceId, task.runId];
71+
for (const candidate of candidateRunIds) {
72+
const runId = candidate?.trim();
73+
if (runId && getAgentRunContext(runId)) {
74+
return true;
75+
}
76+
}
77+
return false;
78+
}
79+
7480
function hasBackingSession(task: TaskRecord): boolean {
7581
if (task.runtime === "cron") {
76-
return false;
82+
const jobId = task.sourceId?.trim();
83+
return jobId ? isCronJobActive(jobId) : false;
84+
}
85+
86+
if (task.runtime === "cli" && hasActiveCliRun(task)) {
87+
return true;
7788
}
7889

7990
const childSessionKey = task.childSessionKey?.trim();
@@ -89,17 +100,12 @@ function hasBackingSession(task: TaskRecord): boolean {
89100
}
90101
return Boolean(acpEntry.entry);
91102
}
92-
if (task.runtime === "subagent") {
93-
const agentId = parseAgentSessionKey(childSessionKey)?.agentId;
94-
const storePath = resolveStorePath(undefined, { agentId });
95-
const store = loadSessionStore(storePath);
96-
return Boolean(findSessionEntryByKey(store, childSessionKey));
97-
}
98-
99-
if (task.runtime === "cli") {
100-
const chatType = deriveSessionChatType(childSessionKey);
101-
if (chatType === "channel" || chatType === "group" || chatType === "direct") {
102-
return false;
103+
if (task.runtime === "subagent" || task.runtime === "cli") {
104+
if (task.runtime === "cli") {
105+
const chatType = deriveSessionChatType(childSessionKey);
106+
if (chatType === "channel" || chatType === "group" || chatType === "direct") {
107+
return false;
108+
}
103109
}
104110
const agentId = parseAgentSessionKey(childSessionKey)?.agentId;
105111
const storePath = resolveStorePath(undefined, { agentId });

src/tasks/task-registry.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
22
import { startAcpSpawnParentStreamRelay } from "../agents/acp-spawn-parent-stream.js";
3+
import { resetCronActiveJobsForTests } from "../cron/active-jobs.js";
34
import {
45
emitAgentEvent,
56
registerAgentRunContext,
@@ -226,6 +227,7 @@ describe("task-registry", () => {
226227
resetSystemEventsForTest();
227228
resetHeartbeatWakeStateForTests();
228229
resetAgentRunContextForTest();
230+
resetCronActiveJobsForTests();
229231
resetTaskRegistryDeliveryRuntimeForTests();
230232
resetTaskRegistryForTests({ persist: false });
231233
resetTaskFlowRegistryForTests({ persist: false });

0 commit comments

Comments
 (0)