refactor: move subworkflow dispatch behind runner#2244
Conversation
📝 WalkthroughWalkthroughThis PR replaces dispatcher-based distributed sub-workflow execution with a SubWorkflowRunner interface and implementation, refactors SubDAGExecutor to use injected runners, updates agent/context/engine/service wiring to provide SubWorkflowRunnerFactory, and adjusts tests accordingly. ChangesDistributed Sub-Workflow Execution Refactor
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
1 issue found across 15 files
Reply with feedback, questions, or to request a fix.
Fix all with cubic | Re-trigger cubic
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
internal/runtime/executor/dag_runner.go (1)
582-661: ⚡ Quick winRelease
e.mubefore calling cancellation/stop operations inSubDAGExecutor.Stop(internal/runtime/executor/dag_runner.go, lines 582-661)The “deadlock because
Cancelwaits for in-flightRun/Retry” scenario isn’t supported by the cancel flow—Canceljust dispatches an abort request and returns. Still,Stopholdse.muwhile callingsubWorkflowRunner.Cancel(...),dagCtx.DB.RequestChildCancel(...), andprocess.Stop(...), which blocks other goroutines that neede.mufor cleanup (e.g.,clearDistributedCanceland local process-map updates), increasing latency/lock contention.Snapshot
distributedRuns/distributedCancels/processesundere.mu, unlock, then perform the external cancellation/stop calls outside the mutex.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/runtime/executor/dag_runner.go` around lines 582 - 661, SubDAGExecutor.Stop currently holds e.mu while calling external cancellers/stoppers (subWorkflowRunner.Cancel, dagCtx.DB.RequestChildCancel, process.Stop), which increases lock contention and can block other goroutines like clearDistributedCancel; fix by under e.mu snapshotting the maps/slices you need (distributedRuns, distributedCancels, processes and any runIDs), then release e.mu and iterate over those snapshots to call subWorkflowRunner.Cancel, dagCtx.DB.RequestChildCancel and process.Stop, collecting errors as before; keep the e.cancelOnce/close(e.killed) protected as appropriate and preserve the existing logging and error aggregation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/runtime/agent/agent.go`:
- Around line 557-560: After creating subWorkflowRunner via
a.createSubWorkflowRunner(ctx), immediately defer its cleanup to avoid resource
leaks (e.g., defer subWorkflowRunner.Close() or the runner's proper cleanup
method) so that any early returns (including the a.dryRun(ctx) path and other
init failures) will still release resources; place the defer directly after the
successful creation of subWorkflowRunner (near the createSubWorkflowRunner call)
rather than waiting until after a.runner.Run(...).
In `@internal/runtime/executor/dag_runner.go`:
- Around line 384-386: The derived contexts created by Execute/Retry (runCtx,
cancel := context.WithCancel(ctx)) are tracked via trackDistributedRun but
clearDistributedCancel only deletes the map entry and never calls cancel,
leaking the derived context; modify clearDistributedCancel (or the defer after
trackDistributedRun) to invoke the stored cancel function before removing it so
that successful/completed runs also cancel their derived context (ensure Stop()
behavior remains unchanged and handle nil-checks when retrieving the cancel from
the internal map).
In `@internal/subflow/runner.go`:
- Around line 143-150: The Retry method currently forwards requests without
ensuring req.StepName is present, allowing buildRetryTask to emit empty-step
retry tasks; update Runner.Retry to reject empty retry step names before
dispatching (e.g., add a check after validate(req.SubWorkflowRequest) that
returns a clear bad-request error if req.StepName == ""), so dispatchRetry and
buildRetryTask are never called with an empty StepName; reference the Retry,
dispatchRetry and buildRetryTask symbols when adding the guard and error
message.
- Around line 108-120: ShouldRun currently returns true for distributed routing
even when Run will later fail in Run.validate; update Runner.ShouldRun to mirror
the same prerequisites checked in Run.validate by also returning false if
req.RunID is empty or req.RootDAGRun is zero. In other words, inside ShouldRun
(method on Runner) keep the existing nil/dispatcher/DAG checks and the
ForceLocal/WorkerSelector/defaultMode logic but add checks for req.RunID == ""
and req.RootDAGRun == 0 so requests that would fail validation in Run (see
Run.validate) are not routed as distributed.
---
Nitpick comments:
In `@internal/runtime/executor/dag_runner.go`:
- Around line 582-661: SubDAGExecutor.Stop currently holds e.mu while calling
external cancellers/stoppers (subWorkflowRunner.Cancel,
dagCtx.DB.RequestChildCancel, process.Stop), which increases lock contention and
can block other goroutines like clearDistributedCancel; fix by under e.mu
snapshotting the maps/slices you need (distributedRuns, distributedCancels,
processes and any runIDs), then release e.mu and iterate over those snapshots to
call subWorkflowRunner.Cancel, dagCtx.DB.RequestChildCancel and process.Stop,
collecting errors as before; keep the e.cancelOnce/close(e.killed) protected as
appropriate and preserve the existing logging and error aggregation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d0d5164b-8d9a-48bb-af3e-13f373f50d25
📒 Files selected for processing (15)
internal/cmd/context.gointernal/cmd/dry.gointernal/cmd/restart.gointernal/cmd/retry.gointernal/cmd/start.gointernal/engine/engine.gointernal/engine/run.gointernal/runtime/agent/agent.gointernal/runtime/executor/dag_runner.gointernal/runtime/executor/dag_runner_test.gointernal/runtime/executor/subworkflow.gointernal/service/worker/remote_handler.gointernal/subflow/runner.gointernal/subflow/runner_test.gointernal/test/helper.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/runtime/executor/dag_runner.go`:
- Around line 478-486: clearDistributedCancel currently only removes the cancel
func from distributedCancels leaving the run marked active in distributedRuns so
subsequent Stop() calls still try to cancel finished child workflows; modify
clearDistributedCancel to also remove the runID from distributedRuns under the
same mutex (e.mu.mu.Lock()/Unlock())—i.e., inside the critical section fetch
cancel := e.distributedCancels[runID], delete(e.distributedCancels, runID),
delete(e.distributedRuns, runID), then unlock and call cancel() if non-nil so
the run is no longer considered active.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 38550efa-ff71-409e-bac3-a617842fb41d
📒 Files selected for processing (4)
internal/runtime/agent/agent.gointernal/runtime/executor/dag_runner.gointernal/subflow/runner.gointernal/subflow/runner_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- internal/subflow/runner.go
- internal/subflow/runner_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/runtime/executor/dag_runner_test.go (1)
779-807: 💤 Low valueAdd a compile-time interface assertion for
mockSubWorkflowRunner.
mockDatabasealready guards its contract withvar _ exec1.Database = (*mockDatabase)(nil)(Line 772). Mirroring that for the new mock catches signature drift againstSubWorkflowRunnerat compile time rather than at call sites.♻️ Proposed addition
+var _ SubWorkflowRunner = (*mockSubWorkflowRunner)(nil) + type mockSubWorkflowRunner struct { shouldRun bool🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@internal/runtime/executor/dag_runner_test.go` around lines 779 - 807, Add a compile-time interface assertion to ensure mockSubWorkflowRunner implements SubWorkflowRunner by adding a declaration like var _ SubWorkflowRunner = (*mockSubWorkflowRunner)(nil) (placed after the mockSubWorkflowRunner type definition); this will cause a compile-time error if method signatures drift and matches the pattern used for mockDatabase's assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@internal/runtime/executor/dag_runner_test.go`:
- Around line 779-807: Add a compile-time interface assertion to ensure
mockSubWorkflowRunner implements SubWorkflowRunner by adding a declaration like
var _ SubWorkflowRunner = (*mockSubWorkflowRunner)(nil) (placed after the
mockSubWorkflowRunner type definition); this will cause a compile-time error if
method signatures drift and matches the pattern used for mockDatabase's
assertion.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: add5c358-6cbe-4335-9e86-9e0032680849
📒 Files selected for processing (2)
internal/runtime/executor/dag_runner.gointernal/runtime/executor/dag_runner_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/runtime/executor/dag_runner.go
Summary
Testing
Summary by cubic
Moved child workflow dispatch behind
runtime/executor.SubWorkflowRunnerwith a coordinator-backed runner ininternal/subflow. This decouples runtime from the coordinator, centralizes subworkflow handling, and fixes cleanup of completed distributed subflow runs.Refactors
internal/runtime/executor/subworkflow.go(SubWorkflowRunnerinterface, request types for retry/cancel/workspace, andWithSubWorkflowRunner).internal/subflowwith ShouldRun, Run, Retry, Cancel, Cleanup, polling/logging, output extraction, agent snapshot, and workspace bundle upload; addedinternal/subflow/runner_test.go.SubWorkflowRequestand delegates distributed paths to the injected runner; improved cancellation by tracking active waits, calling runner.Cancel, and clearing distributed run/cancel tracking on completion; removed embedded coordinator logic.DispatcherFactorywithSubWorkflowRunnerFactoryinagent.Options; wired throughinternal/cmd/*,internal/engine,internal/service/workerto buildsubflow.Newover a coordinator runtime dispatcher; updated tests.Migration
DispatcherFactorywithSubWorkflowRunnerFactorywhen constructing the agent.coordinator.NewRuntimeDispatcher) and wrapping it withsubflow.New(dispatcher, defaultExecMode).runtime/executor.SubWorkflowRunnerand inject viaruntime/executor.WithSubWorkflowRunner.Written for commit 91c727d. Summary will update on new commits.
Summary by CodeRabbit