Skip to content

Commit 2fa4c7c

Browse files
authored
TaskFlow: restore managed substrate (#58930)
Merged via squash. Prepared head SHA: c990938 Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Reviewed-by: @mbelinky
1 parent 52d2bd5 commit 2fa4c7c

25 files changed

Lines changed: 3243 additions & 6 deletions

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ Docs: https://docs.openclaw.ai
1111
- Providers/runtime: add provider-owned replay hook surfaces for transcript policy, replay cleanup, and reasoning-mode dispatch. (#59143) Thanks @jalehman.
1212
- Diffs: add plugin-owned `viewerBaseUrl` so viewer links can use a stable proxy/public origin without passing `baseUrl` on every tool call. (#59341) Related #59227. Thanks @gumadeiras.
1313
- Matrix/plugin: emit spec-compliant `m.mentions` metadata across text sends, media captions, edits, poll fallback text, and action-driven edits so Matrix mentions notify reliably in clients like Element. (#59323) Thanks @gumadeiras.
14+
- Agents/compaction: resolve `agents.defaults.compaction.model` consistently for manual `/compact` and other context-engine compaction paths, so engine-owned compaction uses the configured override model across runtime entrypoints. (#56710) Thanks @oliviareid-svg
15+
- Channels/session routing: move provider-specific session conversation grammar into plugin-owned session-key surfaces, preserving Telegram topic routing and Feishu scoped inheritance across bootstrap, model override, restart, and tool-policy paths.
16+
- WhatsApp/reactions: add `reactionLevel` guidance for agent reactions. Thanks @mcaxtr.
17+
- Feishu/comments: add a dedicated Drive comment-event flow with comment-thread context resolution, in-thread replies, and `feishu_drive` comment actions for document collaboration workflows. (#58497) thanks @wittam-01.
18+
- Tasks/TaskFlow: restore the core TaskFlow substrate with managed-vs-mirrored sync modes, durable flow state/revision tracking, and `openclaw flows` inspection/recovery primitives so background orchestration can persist and be operated separately from plugin authoring layers. (#58930) Thanks @mbelinky.
1419

1520
### Fixes
1621

src/cli/program/register.status-health-sessions.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ const mocks = vi.hoisted(() => ({
1313
tasksShowCommand: vi.fn(),
1414
tasksNotifyCommand: vi.fn(),
1515
tasksCancelCommand: vi.fn(),
16+
flowsListCommand: vi.fn(),
17+
flowsShowCommand: vi.fn(),
18+
flowsCancelCommand: vi.fn(),
1619
setVerbose: vi.fn(),
1720
runtime: {
1821
log: vi.fn(),
@@ -31,6 +34,9 @@ const tasksMaintenanceCommand = mocks.tasksMaintenanceCommand;
3134
const tasksShowCommand = mocks.tasksShowCommand;
3235
const tasksNotifyCommand = mocks.tasksNotifyCommand;
3336
const tasksCancelCommand = mocks.tasksCancelCommand;
37+
const flowsListCommand = mocks.flowsListCommand;
38+
const flowsShowCommand = mocks.flowsShowCommand;
39+
const flowsCancelCommand = mocks.flowsCancelCommand;
3440
const setVerbose = mocks.setVerbose;
3541
const runtime = mocks.runtime;
3642

@@ -59,6 +65,12 @@ vi.mock("../../commands/tasks.js", () => ({
5965
tasksCancelCommand: mocks.tasksCancelCommand,
6066
}));
6167

68+
vi.mock("../../commands/flows.js", () => ({
69+
flowsListCommand: mocks.flowsListCommand,
70+
flowsShowCommand: mocks.flowsShowCommand,
71+
flowsCancelCommand: mocks.flowsCancelCommand,
72+
}));
73+
6274
vi.mock("../../globals.js", () => ({
6375
setVerbose: mocks.setVerbose,
6476
}));
@@ -87,6 +99,9 @@ describe("registerStatusHealthSessionsCommands", () => {
8799
tasksShowCommand.mockResolvedValue(undefined);
88100
tasksNotifyCommand.mockResolvedValue(undefined);
89101
tasksCancelCommand.mockResolvedValue(undefined);
102+
flowsListCommand.mockResolvedValue(undefined);
103+
flowsShowCommand.mockResolvedValue(undefined);
104+
flowsCancelCommand.mockResolvedValue(undefined);
90105
});
91106

92107
it("runs status command with timeout and debug-derived verbose", async () => {
@@ -223,6 +238,34 @@ describe("registerStatusHealthSessionsCommands", () => {
223238
);
224239
});
225240

241+
it("runs flows subcommands with forwarded options", async () => {
242+
await runCli(["flows", "list", "--json", "--status", "blocked"]);
243+
expect(flowsListCommand).toHaveBeenCalledWith(
244+
expect.objectContaining({
245+
json: true,
246+
status: "blocked",
247+
}),
248+
runtime,
249+
);
250+
251+
await runCli(["flows", "show", "flow-123", "--json"]);
252+
expect(flowsShowCommand).toHaveBeenCalledWith(
253+
expect.objectContaining({
254+
lookup: "flow-123",
255+
json: true,
256+
}),
257+
runtime,
258+
);
259+
260+
await runCli(["flows", "cancel", "flow-123"]);
261+
expect(flowsCancelCommand).toHaveBeenCalledWith(
262+
expect.objectContaining({
263+
lookup: "flow-123",
264+
}),
265+
runtime,
266+
);
267+
});
268+
226269
it("forwards parent-level all-agents to cleanup subcommand", async () => {
227270
await runCli(["sessions", "--all-agents", "cleanup", "--dry-run"]);
228271

src/cli/program/register.status-health-sessions.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type { Command } from "commander";
2+
import { flowsCancelCommand, flowsListCommand, flowsShowCommand } from "../../commands/flows.js";
23
import { healthCommand } from "../../commands/health.js";
34
import { sessionsCleanupCommand } from "../../commands/sessions-cleanup.js";
45
import { sessionsCommand } from "../../commands/sessions.js";
@@ -373,4 +374,79 @@ export function registerStatusHealthSessionsCommands(program: Command) {
373374
);
374375
});
375376
});
377+
378+
const flowsCmd = program
379+
.command("flows")
380+
.description("Inspect durable background flow state")
381+
.option("--json", "Output as JSON", false)
382+
.option(
383+
"--status <name>",
384+
"Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)",
385+
)
386+
.action(async (opts) => {
387+
await runCommandWithRuntime(defaultRuntime, async () => {
388+
await flowsListCommand(
389+
{
390+
json: Boolean(opts.json),
391+
status: opts.status as string | undefined,
392+
},
393+
defaultRuntime,
394+
);
395+
});
396+
});
397+
flowsCmd.enablePositionalOptions();
398+
399+
flowsCmd
400+
.command("list")
401+
.description("List tracked background flows")
402+
.option("--json", "Output as JSON", false)
403+
.option(
404+
"--status <name>",
405+
"Filter by status (queued, running, waiting, blocked, succeeded, failed, cancelled, lost)",
406+
)
407+
.action(async (opts, command) => {
408+
const parentOpts = command.parent?.opts() as { json?: boolean; status?: string } | undefined;
409+
await runCommandWithRuntime(defaultRuntime, async () => {
410+
await flowsListCommand(
411+
{
412+
json: Boolean(opts.json || parentOpts?.json),
413+
status: (opts.status as string | undefined) ?? parentOpts?.status,
414+
},
415+
defaultRuntime,
416+
);
417+
});
418+
});
419+
420+
flowsCmd
421+
.command("show")
422+
.description("Show one background flow by flow id or owner key")
423+
.argument("<lookup>", "Flow id or owner key")
424+
.option("--json", "Output as JSON", false)
425+
.action(async (lookup, opts, command) => {
426+
const parentOpts = command.parent?.opts() as { json?: boolean } | undefined;
427+
await runCommandWithRuntime(defaultRuntime, async () => {
428+
await flowsShowCommand(
429+
{
430+
lookup,
431+
json: Boolean(opts.json || parentOpts?.json),
432+
},
433+
defaultRuntime,
434+
);
435+
});
436+
});
437+
438+
flowsCmd
439+
.command("cancel")
440+
.description("Cancel a running background flow")
441+
.argument("<lookup>", "Flow id or owner key")
442+
.action(async (lookup) => {
443+
await runCommandWithRuntime(defaultRuntime, async () => {
444+
await flowsCancelCommand(
445+
{
446+
lookup,
447+
},
448+
defaultRuntime,
449+
);
450+
});
451+
});
376452
}

src/commands/doctor-workspace-status.test.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ const mocks = vi.hoisted(() => ({
1313
buildWorkspaceSkillStatus: vi.fn(),
1414
buildPluginStatusReport: vi.fn(),
1515
buildPluginCompatibilityWarnings: vi.fn(),
16+
listFlowRecords: vi.fn<() => unknown[]>(() => []),
17+
listTasksForFlowId: vi.fn<(flowId: string) => unknown[]>((_flowId: string) => []),
1618
}));
1719

1820
vi.mock("../agents/agent-scope.js", () => ({
@@ -30,9 +32,21 @@ vi.mock("../plugins/status.js", () => ({
3032
mocks.buildPluginCompatibilityWarnings(...args),
3133
}));
3234

35+
vi.mock("../tasks/flow-runtime-internal.js", () => ({
36+
listFlowRecords: () => mocks.listFlowRecords(),
37+
}));
38+
39+
vi.mock("../tasks/runtime-internal.js", () => ({
40+
listTasksForFlowId: (flowId: string) => mocks.listTasksForFlowId(flowId),
41+
}));
42+
3343
async function runNoteWorkspaceStatusForTest(
3444
loadResult: ReturnType<typeof createPluginLoadResult>,
3545
compatibilityWarnings: string[] = [],
46+
opts?: {
47+
flows?: unknown[];
48+
tasksByFlowId?: (flowId: string) => unknown[];
49+
},
3650
) {
3751
mocks.resolveDefaultAgentId.mockReturnValue("default");
3852
mocks.resolveAgentWorkspaceDir.mockReturnValue("/workspace");
@@ -44,6 +58,10 @@ async function runNoteWorkspaceStatusForTest(
4458
...loadResult,
4559
});
4660
mocks.buildPluginCompatibilityWarnings.mockReturnValue(compatibilityWarnings);
61+
mocks.listFlowRecords.mockReturnValue(opts?.flows ?? []);
62+
mocks.listTasksForFlowId.mockImplementation((flowId: string) =>
63+
opts?.tasksByFlowId ? opts.tasksByFlowId(flowId) : [],
64+
);
4765

4866
const noteSpy = vi.spyOn(noteModule, "note").mockImplementation(() => {});
4967
noteWorkspaceStatus({});
@@ -159,4 +177,32 @@ describe("noteWorkspaceStatus", () => {
159177
noteSpy.mockRestore();
160178
}
161179
});
180+
181+
it("adds TaskFlow recovery hints for broken blocked flows", async () => {
182+
const noteSpy = await runNoteWorkspaceStatusForTest(createPluginLoadResult(), [], {
183+
flows: [
184+
{
185+
flowId: "flow-123",
186+
syncMode: "managed",
187+
ownerKey: "agent:main:main",
188+
revision: 0,
189+
status: "blocked",
190+
notifyPolicy: "done_only",
191+
goal: "Investigate PR batch",
192+
blockedTaskId: "task-missing",
193+
createdAt: 100,
194+
updatedAt: 100,
195+
},
196+
],
197+
tasksByFlowId: () => [],
198+
});
199+
try {
200+
const recoveryCalls = noteSpy.mock.calls.filter(([, title]) => title === "TaskFlow recovery");
201+
expect(recoveryCalls).toHaveLength(1);
202+
expect(String(recoveryCalls[0]?.[0])).toContain("flow-123");
203+
expect(String(recoveryCalls[0]?.[0])).toContain("openclaw flows show <flow-id>");
204+
} finally {
205+
noteSpy.mockRestore();
206+
}
207+
});
162208
});

src/commands/doctor-workspace-status.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,54 @@
11
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js";
22
import { buildWorkspaceSkillStatus } from "../agents/skills-status.js";
3+
import { formatCliCommand } from "../cli/command-format.js";
34
import type { OpenClawConfig } from "../config/config.js";
45
import { buildPluginCompatibilityWarnings, buildPluginStatusReport } from "../plugins/status.js";
6+
import { listFlowRecords } from "../tasks/flow-runtime-internal.js";
7+
import { listTasksForFlowId } from "../tasks/runtime-internal.js";
58
import { note } from "../terminal/note.js";
69
import { detectLegacyWorkspaceDirs, formatLegacyWorkspaceWarning } from "./doctor-workspace.js";
710

11+
function noteFlowRecoveryHints() {
12+
const suspicious = listFlowRecords().flatMap((flow) => {
13+
const tasks = listTasksForFlowId(flow.flowId);
14+
const findings: string[] = [];
15+
if (
16+
flow.syncMode === "managed" &&
17+
flow.status === "running" &&
18+
tasks.length === 0 &&
19+
flow.waitJson === undefined
20+
) {
21+
findings.push(
22+
`${flow.flowId}: running managed flow has no linked tasks or wait state; inspect or cancel it manually.`,
23+
);
24+
}
25+
if (
26+
flow.status === "blocked" &&
27+
flow.blockedTaskId &&
28+
!tasks.some((task) => task.taskId === flow.blockedTaskId)
29+
) {
30+
findings.push(
31+
`${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`,
32+
);
33+
}
34+
return findings;
35+
});
36+
if (suspicious.length === 0) {
37+
return;
38+
}
39+
note(
40+
[
41+
...suspicious.slice(0, 5),
42+
suspicious.length > 5 ? `...and ${suspicious.length - 5} more.` : null,
43+
`Inspect: ${formatCliCommand("openclaw flows show <flow-id>")}`,
44+
`Cancel: ${formatCliCommand("openclaw flows cancel <flow-id>")}`,
45+
]
46+
.filter((line): line is string => Boolean(line))
47+
.join("\n"),
48+
"TaskFlow recovery",
49+
);
50+
}
51+
852
export function noteWorkspaceStatus(cfg: OpenClawConfig) {
953
const workspaceDir = resolveAgentWorkspaceDir(cfg, resolveDefaultAgentId(cfg));
1054
const legacyWorkspace = detectLegacyWorkspaceDirs({ workspaceDir });
@@ -74,5 +118,7 @@ export function noteWorkspaceStatus(cfg: OpenClawConfig) {
74118
note(lines.join("\n"), "Plugin diagnostics");
75119
}
76120

121+
noteFlowRecoveryHints();
122+
77123
return { workspaceDir };
78124
}

0 commit comments

Comments
 (0)