Skip to content

refactor: move subworkflow dispatch behind runner#2244

Merged
yohamta0 merged 6 commits into
mainfrom
refactor-subworkflow-runner
Jun 1, 2026
Merged

refactor: move subworkflow dispatch behind runner#2244
yohamta0 merged 6 commits into
mainfrom
refactor-subworkflow-runner

Conversation

@yohamta0

@yohamta0 yohamta0 commented Jun 1, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • add a SubWorkflowRunner port so runtime asks for child workflow execution without owning coordinator dispatch
  • move coordinator-backed child workflow dispatch, retry, cancellation, polling, agent snapshot, and workspace bundle handling into internal/subflow
  • wire cmd, engine, worker, and tests through the new runner factory and cover the runner policy

Testing

  • go test ./internal/subflow ./internal/runtime/executor -count=1
  • go test ./internal/engine ./internal/service/worker ./internal/cmd -count=1
  • go test -race ./internal/subflow ./internal/runtime/executor -count=1
  • go test ./internal/intg/distr -run 'TestSubDAG|TestParams_DistributedSubDAG|TestBaseConfig_SubDAGPropagation|TestCancellation_SubDAG' -count=1
  • go test ./internal/intg -run 'TestInlineSubDAG|TestExternalSubDAG|TestCallSubDAG|TestInlineParams_LocalSubDAGRuntimeCoercion|TestSubDAGParamsReferencedInChildEnv' -count=1
  • go test ./internal/runtime/... -count=1
  • make lint
  • git diff --check

Summary by cubic

Moved child workflow dispatch behind runtime/executor.SubWorkflowRunner with a coordinator-backed runner in internal/subflow. This decouples runtime from the coordinator, centralizes subworkflow handling, and fixes cleanup of completed distributed subflow runs.

  • Refactors

    • Added internal/runtime/executor/subworkflow.go (SubWorkflowRunner interface, request types for retry/cancel/workspace, and WithSubWorkflowRunner).
    • Implemented coordinator-backed runner in internal/subflow with ShouldRun, Run, Retry, Cancel, Cleanup, polling/logging, output extraction, agent snapshot, and workspace bundle upload; added internal/subflow/runner_test.go.
    • Executor now builds a SubWorkflowRequest and delegates distributed paths to the injected runner; improved cancellation by tracking active waits, calling runner.Cancel, and clearing distributed run/cancel tracking on completion; removed embedded coordinator logic.
    • Replaced DispatcherFactory with SubWorkflowRunnerFactory in agent.Options; wired through internal/cmd/*, internal/engine, internal/service/worker to build subflow.New over a coordinator runtime dispatcher; updated tests.
  • Migration

    • Replace DispatcherFactory with SubWorkflowRunnerFactory when constructing the agent.
    • Build the runner by creating a dispatcher (coordinator.NewRuntimeDispatcher) and wrapping it with subflow.New(dispatcher, defaultExecMode).
    • For custom behavior, implement runtime/executor.SubWorkflowRunner and inject via runtime/executor.WithSubWorkflowRunner.

Written for commit 91c727d. Summary will update on new commits.

Review in cubic

Summary by CodeRabbit

  • Refactor
    • Reworked sub-workflow execution to a new runner-based pattern, improving distributed sub-workflow start/retry/cancel behavior, workspace handling, and cleanup.
  • Tests
    • Added and updated tests validating runner logic, dispatch/retry flows, cancellation, and executor integration to ensure correctness across local and distributed runs.

@coderabbitai

coderabbitai Bot commented Jun 1, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

This PR replaces dispatcher-based distributed sub-workflow execution with a SubWorkflowRunner interface and implementation, refactors SubDAGExecutor to use injected runners, updates agent/context/engine/service wiring to provide SubWorkflowRunnerFactory, and adjusts tests accordingly.

Changes

Distributed Sub-Workflow Execution Refactor

Layer / File(s) Summary
SubWorkflowRunner interface and context injection
internal/runtime/executor/subworkflow.go
Introduces SubWorkflowRunner interface with ShouldRun, Run, Retry, Cancel methods, request/workspace types, and context injection helpers.
Subflow runner implementation and tests
internal/subflow/runner.go, internal/subflow/runner_test.go
Implements subflow.Runner (coordinator-backed dispatch, polling, cancellation, workspace handling) and tests validating gating, dispatch/retry/cancel behavior and output mapping.
Agent-level runner wiring
internal/runtime/agent/agent.go
Replaces DispatcherFactory with SubWorkflowRunnerFactory in Agent and Options, creates runner via factory, injects into run context, and updates cleanup to call runner Cleanup when available.
Executor refactored to use injected runner
internal/runtime/executor/dag_runner.go, internal/runtime/executor/dag_runner_test.go
SubDAGExecutor now constructs SubWorkflowRequest and uses injected SubWorkflowRunner for distributed Execute/Retry/Stop; removes coordinator task-building/polling subsystem and updates tests to assert runner usage and distributed-run tracking.
Command/engine/service/test wiring
internal/cmd/context.go, internal/cmd/dry.go, internal/cmd/restart.go, internal/cmd/retry.go, internal/cmd/start.go, internal/engine/engine.go, internal/engine/run.go, internal/service/worker/remote_handler.go, internal/test/helper.go
Entry points and test helper now provide SubWorkflowRunnerFactory (which builds a coordinator dispatcher and wraps it with subflow.New() using configured execution mode) to agent options instead of DispatcherFactory.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • dagucloud/dagu#1986: Touches executor task construction and coordinator execution paths; related to executor changes.
  • dagucloud/dagu#2235: Refactors command/agent factory wiring; related to dispatcher vs runner factory changes.
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 37.14% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main refactoring: moving subworkflow dispatch behind a runner abstraction.
Description check ✅ Passed The description includes a summary, changes list, testing commands, and migration guidance, closely matching the template's required sections.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor-subworkflow-runner

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 15 files

Reply with feedback, questions, or to request a fix.

Fix all with cubic | Re-trigger cubic

Comment thread internal/runtime/executor/dag_runner.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (1)
internal/runtime/executor/dag_runner.go (1)

582-661: ⚡ Quick win

Release e.mu before calling cancellation/stop operations in SubDAGExecutor.Stop (internal/runtime/executor/dag_runner.go, lines 582-661)

The “deadlock because Cancel waits for in-flight Run/Retry” scenario isn’t supported by the cancel flow—Cancel just dispatches an abort request and returns. Still, Stop holds e.mu while calling subWorkflowRunner.Cancel(...), dagCtx.DB.RequestChildCancel(...), and process.Stop(...), which blocks other goroutines that need e.mu for cleanup (e.g., clearDistributedCancel and local process-map updates), increasing latency/lock contention.

Snapshot distributedRuns/distributedCancels/processes under e.mu, unlock, then perform the external cancellation/stop calls outside the mutex.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/runtime/executor/dag_runner.go` around lines 582 - 661,
SubDAGExecutor.Stop currently holds e.mu while calling external
cancellers/stoppers (subWorkflowRunner.Cancel, dagCtx.DB.RequestChildCancel,
process.Stop), which increases lock contention and can block other goroutines
like clearDistributedCancel; fix by under e.mu snapshotting the maps/slices you
need (distributedRuns, distributedCancels, processes and any runIDs), then
release e.mu and iterate over those snapshots to call subWorkflowRunner.Cancel,
dagCtx.DB.RequestChildCancel and process.Stop, collecting errors as before; keep
the e.cancelOnce/close(e.killed) protected as appropriate and preserve the
existing logging and error aggregation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/runtime/agent/agent.go`:
- Around line 557-560: After creating subWorkflowRunner via
a.createSubWorkflowRunner(ctx), immediately defer its cleanup to avoid resource
leaks (e.g., defer subWorkflowRunner.Close() or the runner's proper cleanup
method) so that any early returns (including the a.dryRun(ctx) path and other
init failures) will still release resources; place the defer directly after the
successful creation of subWorkflowRunner (near the createSubWorkflowRunner call)
rather than waiting until after a.runner.Run(...).

In `@internal/runtime/executor/dag_runner.go`:
- Around line 384-386: The derived contexts created by Execute/Retry (runCtx,
cancel := context.WithCancel(ctx)) are tracked via trackDistributedRun but
clearDistributedCancel only deletes the map entry and never calls cancel,
leaking the derived context; modify clearDistributedCancel (or the defer after
trackDistributedRun) to invoke the stored cancel function before removing it so
that successful/completed runs also cancel their derived context (ensure Stop()
behavior remains unchanged and handle nil-checks when retrieving the cancel from
the internal map).

In `@internal/subflow/runner.go`:
- Around line 143-150: The Retry method currently forwards requests without
ensuring req.StepName is present, allowing buildRetryTask to emit empty-step
retry tasks; update Runner.Retry to reject empty retry step names before
dispatching (e.g., add a check after validate(req.SubWorkflowRequest) that
returns a clear bad-request error if req.StepName == ""), so dispatchRetry and
buildRetryTask are never called with an empty StepName; reference the Retry,
dispatchRetry and buildRetryTask symbols when adding the guard and error
message.
- Around line 108-120: ShouldRun currently returns true for distributed routing
even when Run will later fail in Run.validate; update Runner.ShouldRun to mirror
the same prerequisites checked in Run.validate by also returning false if
req.RunID is empty or req.RootDAGRun is zero. In other words, inside ShouldRun
(method on Runner) keep the existing nil/dispatcher/DAG checks and the
ForceLocal/WorkerSelector/defaultMode logic but add checks for req.RunID == ""
and req.RootDAGRun == 0 so requests that would fail validation in Run (see
Run.validate) are not routed as distributed.

---

Nitpick comments:
In `@internal/runtime/executor/dag_runner.go`:
- Around line 582-661: SubDAGExecutor.Stop currently holds e.mu while calling
external cancellers/stoppers (subWorkflowRunner.Cancel,
dagCtx.DB.RequestChildCancel, process.Stop), which increases lock contention and
can block other goroutines like clearDistributedCancel; fix by under e.mu
snapshotting the maps/slices you need (distributedRuns, distributedCancels,
processes and any runIDs), then release e.mu and iterate over those snapshots to
call subWorkflowRunner.Cancel, dagCtx.DB.RequestChildCancel and process.Stop,
collecting errors as before; keep the e.cancelOnce/close(e.killed) protected as
appropriate and preserve the existing logging and error aggregation.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d0d5164b-8d9a-48bb-af3e-13f373f50d25

📥 Commits

Reviewing files that changed from the base of the PR and between 90e344e and 2285c50.

📒 Files selected for processing (15)
  • internal/cmd/context.go
  • internal/cmd/dry.go
  • internal/cmd/restart.go
  • internal/cmd/retry.go
  • internal/cmd/start.go
  • internal/engine/engine.go
  • internal/engine/run.go
  • internal/runtime/agent/agent.go
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/executor/dag_runner_test.go
  • internal/runtime/executor/subworkflow.go
  • internal/service/worker/remote_handler.go
  • internal/subflow/runner.go
  • internal/subflow/runner_test.go
  • internal/test/helper.go

Comment thread internal/runtime/agent/agent.go
Comment thread internal/runtime/executor/dag_runner.go
Comment thread internal/subflow/runner.go
Comment thread internal/subflow/runner.go
@yohamta0

yohamta0 commented Jun 1, 2026

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Jun 1, 2026

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/runtime/executor/dag_runner.go`:
- Around line 478-486: clearDistributedCancel currently only removes the cancel
func from distributedCancels leaving the run marked active in distributedRuns so
subsequent Stop() calls still try to cancel finished child workflows; modify
clearDistributedCancel to also remove the runID from distributedRuns under the
same mutex (e.mu.mu.Lock()/Unlock())—i.e., inside the critical section fetch
cancel := e.distributedCancels[runID], delete(e.distributedCancels, runID),
delete(e.distributedRuns, runID), then unlock and call cancel() if non-nil so
the run is no longer considered active.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 38550efa-ff71-409e-bac3-a617842fb41d

📥 Commits

Reviewing files that changed from the base of the PR and between 2285c50 and 4b50665.

📒 Files selected for processing (4)
  • internal/runtime/agent/agent.go
  • internal/runtime/executor/dag_runner.go
  • internal/subflow/runner.go
  • internal/subflow/runner_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal/subflow/runner.go
  • internal/subflow/runner_test.go

Comment thread internal/runtime/executor/dag_runner.go
@yohamta0

yohamta0 commented Jun 1, 2026

Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai

coderabbitai Bot commented Jun 1, 2026

Copy link
Copy Markdown
✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
internal/runtime/executor/dag_runner_test.go (1)

779-807: 💤 Low value

Add a compile-time interface assertion for mockSubWorkflowRunner.

mockDatabase already guards its contract with var _ exec1.Database = (*mockDatabase)(nil) (Line 772). Mirroring that for the new mock catches signature drift against SubWorkflowRunner at compile time rather than at call sites.

♻️ Proposed addition
+var _ SubWorkflowRunner = (*mockSubWorkflowRunner)(nil)
+
 type mockSubWorkflowRunner struct {
 	shouldRun     bool
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/runtime/executor/dag_runner_test.go` around lines 779 - 807, Add a
compile-time interface assertion to ensure mockSubWorkflowRunner implements
SubWorkflowRunner by adding a declaration like var _ SubWorkflowRunner =
(*mockSubWorkflowRunner)(nil) (placed after the mockSubWorkflowRunner type
definition); this will cause a compile-time error if method signatures drift and
matches the pattern used for mockDatabase's assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@internal/runtime/executor/dag_runner_test.go`:
- Around line 779-807: Add a compile-time interface assertion to ensure
mockSubWorkflowRunner implements SubWorkflowRunner by adding a declaration like
var _ SubWorkflowRunner = (*mockSubWorkflowRunner)(nil) (placed after the
mockSubWorkflowRunner type definition); this will cause a compile-time error if
method signatures drift and matches the pattern used for mockDatabase's
assertion.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: add5c358-6cbe-4335-9e86-9e0032680849

📥 Commits

Reviewing files that changed from the base of the PR and between 4b50665 and 91c727d.

📒 Files selected for processing (2)
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/executor/dag_runner_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/runtime/executor/dag_runner.go

@yohamta0 yohamta0 merged commit a4344a2 into main Jun 1, 2026
11 checks passed
@yohamta0 yohamta0 deleted the refactor-subworkflow-runner branch June 1, 2026 11:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant