Skip to content

Commit 8bdca23

Browse files
authored
TaskFlow: add managed child task execution (#59610)
Merged via squash. Prepared head SHA: e6cdde6 Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Reviewed-by: @mbelinky
1 parent f65da87 commit 8bdca23

7 files changed

Lines changed: 627 additions & 33 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai
1616
- WhatsApp/reactions: add `reactionLevel` guidance for agent reactions. Thanks @mcaxtr.
1717
- Feishu/comments: add a dedicated Drive comment-event flow with comment-thread context resolution, in-thread replies, and `feishu_drive` comment actions for document collaboration workflows. (#58497) thanks @wittam-01.
1818
- Tasks/TaskFlow: restore the core TaskFlow substrate with managed-vs-mirrored sync modes, durable flow state/revision tracking, and `openclaw flows` inspection/recovery primitives so background orchestration can persist and be operated separately from plugin authoring layers. (#58930) Thanks @mbelinky.
19+
- Tasks/TaskFlow: add managed child task spawning plus sticky cancel intent, so external orchestrators can stop scheduling immediately and let parent TaskFlows settle to `cancelled` once active child tasks finish. (#59610) Thanks @mbelinky.
1920

2021
### Fixes
2122

src/commands/doctor-workspace-status.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ function noteFlowRecoveryHints() {
1919
flow.waitJson === undefined
2020
) {
2121
findings.push(
22-
`${flow.flowId}: running managed flow has no linked tasks or wait state; inspect or cancel it manually.`,
22+
`${flow.flowId}: running managed TaskFlow has no linked tasks or wait state; inspect or cancel it manually.`,
2323
);
2424
}
2525
if (
@@ -28,7 +28,7 @@ function noteFlowRecoveryHints() {
2828
!tasks.some((task) => task.taskId === flow.blockedTaskId)
2929
) {
3030
findings.push(
31-
`${flow.flowId}: blocked flow points at missing task ${flow.blockedTaskId}; inspect before retrying.`,
31+
`${flow.flowId}: blocked TaskFlow points at missing task ${flow.blockedTaskId}; inspect before retrying.`,
3232
);
3333
}
3434
return findings;

src/tasks/runtime-internal.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export {
2323
recordTaskProgressByRunId,
2424
resolveTaskForLookupToken,
2525
resetTaskRegistryForTests,
26+
isParentFlowLinkError,
2627
setTaskCleanupAfterById,
2728
setTaskProgressById,
2829
setTaskRunDeliveryStatusByRunId,

src/tasks/task-executor.test.ts

Lines changed: 162 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import {
1616
failTaskRunByRunId,
1717
recordTaskRunProgressByRunId,
1818
retryBlockedFlowAsQueuedTaskRun,
19+
runTaskInFlow,
20+
runTaskInFlowForOwner,
1921
setDetachedTaskDeliveryStatusByRunId,
2022
startTaskRunByRunId,
2123
} from "./task-executor.js";
@@ -289,7 +291,7 @@ describe("task-executor", () => {
289291
});
290292
});
291293

292-
it("cancels active tasks linked to a managed flow", async () => {
294+
it("cancels active tasks linked to a managed TaskFlow", async () => {
293295
await withTaskExecutorStateDir(async () => {
294296
hoisted.cancelSessionMock.mockResolvedValue(undefined);
295297

@@ -330,6 +332,139 @@ describe("task-executor", () => {
330332
});
331333
});
332334

335+
it("runs child tasks under managed TaskFlows", async () => {
336+
await withTaskExecutorStateDir(async () => {
337+
const flow = createManagedFlow({
338+
ownerKey: "agent:main:main",
339+
controllerId: "tests/managed-flow",
340+
goal: "Inspect PR batch",
341+
requesterOrigin: {
342+
channel: "telegram",
343+
to: "telegram:123",
344+
},
345+
});
346+
347+
const created = runTaskInFlow({
348+
flowId: flow.flowId,
349+
runtime: "acp",
350+
childSessionKey: "agent:codex:acp:child",
351+
runId: "run-flow-child",
352+
label: "Inspect a PR",
353+
task: "Inspect a PR",
354+
status: "running",
355+
startedAt: 10,
356+
lastEventAt: 10,
357+
});
358+
359+
expect(created).toMatchObject({
360+
found: true,
361+
created: true,
362+
task: expect.objectContaining({
363+
parentFlowId: flow.flowId,
364+
ownerKey: "agent:main:main",
365+
status: "running",
366+
runId: "run-flow-child",
367+
}),
368+
});
369+
expect(getTaskById(created.task!.taskId)).toMatchObject({
370+
parentFlowId: flow.flowId,
371+
ownerKey: "agent:main:main",
372+
childSessionKey: "agent:codex:acp:child",
373+
});
374+
});
375+
});
376+
377+
it("refuses to add child tasks once cancellation is requested on a managed TaskFlow", async () => {
378+
await withTaskExecutorStateDir(async () => {
379+
const flow = createManagedFlow({
380+
ownerKey: "agent:main:main",
381+
controllerId: "tests/managed-flow",
382+
goal: "Protected flow",
383+
});
384+
385+
const cancelled = await cancelFlowById({
386+
cfg: {} as never,
387+
flowId: flow.flowId,
388+
});
389+
390+
expect(cancelled).toMatchObject({
391+
found: true,
392+
cancelled: true,
393+
});
394+
395+
const created = runTaskInFlow({
396+
flowId: flow.flowId,
397+
runtime: "acp",
398+
childSessionKey: "agent:codex:acp:child",
399+
runId: "run-flow-after-cancel",
400+
task: "Should be denied",
401+
});
402+
403+
expect(created).toMatchObject({
404+
found: true,
405+
created: false,
406+
reason: "Flow cancellation has already been requested.",
407+
});
408+
});
409+
});
410+
411+
it("sets cancel intent before child tasks settle and finalizes later", async () => {
412+
await withTaskExecutorStateDir(async () => {
413+
hoisted.cancelSessionMock.mockRejectedValue(new Error("still shutting down"));
414+
415+
const flow = createManagedFlow({
416+
ownerKey: "agent:main:main",
417+
controllerId: "tests/managed-flow",
418+
goal: "Long running batch",
419+
});
420+
const child = runTaskInFlow({
421+
flowId: flow.flowId,
422+
runtime: "acp",
423+
childSessionKey: "agent:codex:acp:child",
424+
runId: "run-flow-sticky-cancel",
425+
task: "Inspect a PR",
426+
status: "running",
427+
startedAt: 10,
428+
lastEventAt: 10,
429+
}).task!;
430+
431+
const cancelled = await cancelFlowById({
432+
cfg: {} as never,
433+
flowId: flow.flowId,
434+
});
435+
436+
expect(cancelled).toMatchObject({
437+
found: true,
438+
cancelled: false,
439+
reason: "One or more child tasks are still active.",
440+
flow: expect.objectContaining({
441+
flowId: flow.flowId,
442+
cancelRequestedAt: expect.any(Number),
443+
status: "queued",
444+
}),
445+
});
446+
447+
failTaskRunByRunId({
448+
runId: "run-flow-sticky-cancel",
449+
endedAt: 50,
450+
lastEventAt: 50,
451+
error: "cancel completed later",
452+
status: "cancelled",
453+
});
454+
455+
expect(getTaskById(child.taskId)).toMatchObject({
456+
taskId: child.taskId,
457+
status: "cancelled",
458+
});
459+
expect(getFlowById(flow.flowId)).toMatchObject({
460+
flowId: flow.flowId,
461+
cancelRequestedAt: expect.any(Number),
462+
status: "cancelled",
463+
endedAt: 50,
464+
});
465+
});
466+
});
467+
333468
it("denies cross-owner flow cancellation through the owner-scoped wrapper", async () => {
334469
await withTaskExecutorStateDir(async () => {
335470
const flow = createManagedFlow({
@@ -356,6 +491,32 @@ describe("task-executor", () => {
356491
});
357492
});
358493

494+
it("denies cross-owner managed TaskFlow child spawning through the owner-scoped wrapper", async () => {
495+
await withTaskExecutorStateDir(async () => {
496+
const flow = createManagedFlow({
497+
ownerKey: "agent:main:main",
498+
controllerId: "tests/managed-flow",
499+
goal: "Protected flow",
500+
});
501+
502+
const created = runTaskInFlowForOwner({
503+
flowId: flow.flowId,
504+
callerOwnerKey: "agent:main:other",
505+
runtime: "acp",
506+
childSessionKey: "agent:codex:acp:child",
507+
runId: "run-flow-cross-owner",
508+
task: "Should be denied",
509+
});
510+
511+
expect(created).toMatchObject({
512+
found: false,
513+
created: false,
514+
reason: "Flow not found.",
515+
});
516+
expect(findLatestTaskForFlowId(flow.flowId)).toBeUndefined();
517+
});
518+
});
519+
359520
it("cancels active ACP child tasks", async () => {
360521
await withTaskExecutorStateDir(async () => {
361522
hoisted.cancelSessionMock.mockResolvedValue(undefined);

0 commit comments

Comments
 (0)