Skip to content

fix(dag): propagate runtime params in distributed queue dispatch#2238

Merged
yohamta0 merged 1 commit into
dagucloud:mainfrom
mingfang:fix/distributed-params-in-queue-dispatch
Jun 1, 2026
Merged

fix(dag): propagate runtime params in distributed queue dispatch#2238
yohamta0 merged 1 commit into
dagucloud:mainfrom
mingfang:fix/distributed-params-in-queue-dispatch

Conversation

@mingfang

@mingfang mingfang commented May 31, 2026

Copy link
Copy Markdown
Contributor

Problem

When DAGU_DEFAULT_EXECUTION_MODE=distributed, DAG parameters (e.g., content_hash) passed via the enqueue API were silently lost. The DAG would run on workers with no runtime params, causing failures for DAGs that require them.

Root Cause

In DAGExecutor.ExecuteDAG(), the distributed execution path (when shouldUseDistributedExecution(dag) is true) built task options with WithWorkerSelector, WithPreviousStatus, and WithBaseConfig — but never called WithTaskParams(). This meant task.Params was empty when dispatched to the coordinator.

Meanwhile, the local execution path correctly extracted params from previousStatus.ParamsList via spec.QuoteRuntimeParams() and injected them into the subprocess env.

Fix

Added a WithTaskParams(previousStatus.Params) call in the distributed branch of ExecuteDAG(), before CreateTask(). This mirrors how the API's dispatchStartToCoordinator correctly passes params, ensuring the worker's loadDAG() receives them via spec.WithParams(task.Params).

Tests

All related tests pass: internal/service/scheduler, internal/service/worker, internal/dispatch.


Summary by cubic

Fixes dropped DAG runtime params in distributed queue dispatch. We now append executor.WithTaskParams(previousStatus.Params) to task options before CreateTask(), so workers receive params (e.g., content_hash) when DAGU_DEFAULT_EXECUTION_MODE=distributed.

Written for commit dfe32db. Summary will update on new commits.

Review in cubic

Summary by CodeRabbit

  • Bug Fixes
    • Fixed task parameter propagation in distributed execution scenarios, ensuring parameters are correctly preserved and passed through execution workflows when applicable.

@coderabbitai

coderabbitai Bot commented May 31, 2026

Copy link
Copy Markdown

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0a6d8b21-6b38-43cc-ba1c-e8b64485f8e9

📥 Commits

Reviewing files that changed from the base of the PR and between 683ee5b and dfe32db.

📒 Files selected for processing (1)
  • internal/service/scheduler/dag_executor.go

📝 Walkthrough

Walkthrough

The distributed execution path of DAGExecutor.ExecuteDAG now conditionally includes task parameters from previousStatus.Params when available. The change appends executor.WithTaskParams(previousStatus.Params) to distributed task options when previousStatus is non-nil and contains non-empty parameter data.

Changes

Task Parameter Propagation

Layer / File(s) Summary
Task parameter forwarding in distributed execution
internal/service/scheduler/dag_executor.go
When previousStatus is provided with non-empty Params, the conditional logic now passes those parameters to the distributed task via WithTaskParams before coordinator dispatch.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~8 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: adding runtime parameter propagation to the distributed execution path.
Description check ✅ Passed The description covers all required template sections with comprehensive detail: Problem, Root Cause, Fix explanation, and test confirmation. The fix reasoning is well-documented.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 1 file

Tip: cubic could auto-approve low-risk PRs like this, if it thinks it's safe to merge. Learn more

Re-trigger cubic

@yohamta0

yohamta0 commented Jun 1, 2026

Copy link
Copy Markdown
Collaborator

Amazing, thank you very much for catching this bug!

@yohamta0 yohamta0 merged commit 5779d09 into dagucloud:main Jun 1, 2026
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants