fix(dag): propagate runtime params in distributed queue dispatch#2238
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThe distributed execution path of ChangesTask Parameter Propagation
Estimated code review effort🎯 2 (Simple) | ⏱️ ~8 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
No issues found across 1 file
Tip: cubic could auto-approve low-risk PRs like this, if it thinks it's safe to merge. Learn more
Re-trigger cubic
|
Amazing, thank you very much for catching this bug! |
Problem
When
DAGU_DEFAULT_EXECUTION_MODE=distributed, DAG parameters (e.g.,content_hash) passed via the enqueue API were silently lost. The DAG would run on workers with no runtime params, causing failures for DAGs that require them.Root Cause
In
DAGExecutor.ExecuteDAG(), the distributed execution path (whenshouldUseDistributedExecution(dag)is true) built task options withWithWorkerSelector,WithPreviousStatus, andWithBaseConfig— but never calledWithTaskParams(). This meanttask.Paramswas empty when dispatched to the coordinator.Meanwhile, the local execution path correctly extracted params from
previousStatus.ParamsListviaspec.QuoteRuntimeParams()and injected them into the subprocess env.Fix
Added a
WithTaskParams(previousStatus.Params)call in the distributed branch ofExecuteDAG(), beforeCreateTask(). This mirrors how the API'sdispatchStartToCoordinatorcorrectly passes params, ensuring the worker'sloadDAG()receives them viaspec.WithParams(task.Params).Tests
All related tests pass:
internal/service/scheduler,internal/service/worker,internal/dispatch.Summary by cubic
Fixes dropped DAG runtime params in distributed queue dispatch. We now append
executor.WithTaskParams(previousStatus.Params)to task options beforeCreateTask(), so workers receive params (e.g., content_hash) whenDAGU_DEFAULT_EXECUTION_MODE=distributed.Written for commit dfe32db. Summary will update on new commits.
Summary by CodeRabbit