Examples
Quick reference for all Dagu features. Each example is minimal and copy-paste ready.
Basic Workflows
Parallel Execution (Iterator)
steps:
- call: processor
parallel:
items: [A, B, C]
maxConcurrent: 2
params: "ITEM=${ITEM}"
---
name: processor
steps:
- command: echo "Processing ${ITEM}"Multiple Commands per Step
steps:
- name: build-and-test
command:
- npm install
- npm run build
- npm test
env:
- NODE_ENV: production
workingDir: /appShare step config (env, workingDir, retryPolicy, etc.) across commands instead of duplicating across steps.
Execution Mode: Chain vs Graph
# Default (chain): steps run in order
type: chain
steps:
- command: echo "step 1"
- command: echo "step 2" # Automatically depends on previous
# Graph mode: only explicit dependencies
---
type: graph
steps:
- name: a
command: echo A
depends: [] # Explicitly independent
- name: b
command: echo B
depends: []Control Flow & Conditions
Conditional Execution
steps:
- command: echo "Deploying application"
preconditions:
- condition: "${ENV}"
expected: "production"Complex Preconditions
steps:
- name: conditional-task
command: echo "Processing task"
preconditions:
- command: test -f /data/input.csv
- command: test -s /data/input.csv # File exists and is not empty
- condition: "${ENVIRONMENT}"
expected: "production"
- condition: "`date '+%d'`"
expected: "re:0[1-9]" # First 9 days of month
- condition: "`df -h /data | awk 'NR==2 {print $5}' | sed 's/%//'`"
expected: "re:^[0-7][0-9]$" # Less than 80% disk usageRepeat Until Condition
Looking for iteration over a list? See Parallel Execution.
steps:
- command: curl -f http://service/health
repeatPolicy:
repeat: true
intervalSec: 10
exitCode: [1] # Repeat while exit code is 1Repeat Until Command Succeeds
steps:
- command: curl -f http://service:8080/health
repeatPolicy:
repeat: until # Repeat UNTIL service is healthy
exitCode: [0] # Exit code 0 means success
intervalSec: 10 # Wait 10 seconds between attempts
limit: 30 # Maximum 5 minutesRepeat Until Output Match
steps:
- command: echo "COMPLETED" # Simulates job status check
output: JOB_STATUS
repeatPolicy:
repeat: until # Repeat UNTIL job completes
condition: "${JOB_STATUS}"
expected: "COMPLETED"
intervalSec: 30
limit: 120 # Maximum 1 hour (120 attempts)Repeat Steps
steps:
- command: echo "heartbeat" # Sends heartbeat signal
repeatPolicy:
repeat: while # Repeat indefinitely while successful
intervalSec: 60Repeat Steps Until Success
steps:
- command: echo "Checking status"
repeatPolicy:
repeat: until # Repeat until exit code 0
exitCode: [0]
intervalSec: 30
limit: 20 # Maximum 10 minutesDAG-Level Preconditions
preconditions:
- condition: "`date +%u`"
expected: "re:[1-5]" # Weekdays only
steps:
- command: echo "Run on business days"Negated Preconditions
steps:
# Run only when NOT in production
- command: echo "Running dev task"
preconditions:
- condition: "${ENVIRONMENT}"
expected: "production"
negate: true
# Run only on weekends
- command: echo "Weekend maintenance"
preconditions:
- condition: "`date +%u`"
expected: "re:[1-5]" # Weekdays
negate: true # Invert: run on weekendsContinue On: Exit Codes and Output
steps:
- command: exit 3 # This will exit with code 3
continueOn:
exitCode: [0, 3] # Treat 0 and 3 as non-fatal
output:
- command: "WARNING"
- command: "re:^INFO:.*" # Regex match
markSuccess: true # Mark as success when matched
- command: echo "Continue regardless"Nested Workflows
steps:
- call: etl.yaml
params: "ENV=prod DATE=today"
- call: analyze.yamlMultiple DAGs in One File
steps:
- call: data-processor
params: "TYPE=daily"
---
name: data-processor
params:
- TYPE: "batch"
steps:
- command: echo "Extracting ${TYPE} data"
- command: echo "Transforming data"Dispatch to Specific Workers
steps:
- command: python prepare_dataset.py
- call: train-model
- call: evaluate-model
---
name: train-model
workerSelector:
gpu: "true"
cuda: "11.8"
memory: "64G"
steps:
- command: python train.py --gpu
---
name: evaluate-model
workerSelector:
gpu: "true"
steps:
- command: python evaluate.pyMixed Local and Worker Steps
steps:
# Runs on any available worker (local or remote)
- command: wget https://data.example.com/dataset.tar.gz
# Must run on specific worker type
- call: process-on-gpu
# Runs locally (no selector)
- command: echo "Processing complete"
---
name: process-on-gpu
workerSelector:
gpu: "true"
gpu-model: "nvidia-a100"
steps:
- command: python gpu_process.pyParallel Distributed Tasks
steps:
- command: python split_data.py --chunks=10
output: CHUNKS
- call: chunk-processor
parallel:
items: ${CHUNKS}
maxConcurrent: 5
params: "CHUNK=${ITEM}"
- command: python merge_results.py
---
name: chunk-processor
workerSelector:
memory: "16G"
cpu-cores: "8"
params:
- CHUNK: ""
steps:
- command: python process_chunk.py ${CHUNK}Error Handling & Reliability
Continue on Failure
steps:
# Optional task that may fail
- command: exit 1 # This will fail
continueOn:
failure: true
# This step always runs
- command: echo "This must succeed"Continue on Skipped
steps:
# Optional step that may be skipped
- command: echo "Enabling feature"
preconditions:
- condition: "${FEATURE_FLAG}"
expected: "enabled"
continueOn:
skipped: true
# This step always runs
- command: echo "Processing main task"Retry on Failure
steps:
- command: curl https://api.example.com
retryPolicy:
limit: 3
intervalSec: 30Smart Retry Policies
steps:
- command: curl -f https://api.example.com/data
retryPolicy:
limit: 5
intervalSec: 30
exitCodes: [429, 503, 504] # Rate limit, service unavailableRetry with Exponential Backoff
steps:
- command: curl https://api.example.com/data
retryPolicy:
limit: 5
intervalSec: 2
backoff: true # 2x multiplier
maxIntervalSec: 60 # Cap at 60s
# Intervals: 2s, 4s, 8s, 16s, 32s → 60sRepeat with Backoff
Looking for iteration over a list? See Parallel Execution.
steps:
- command: nc -z localhost 8080
repeatPolicy:
repeat: while
exitCode: [1] # While connection fails
intervalSec: 1
backoff: 2.0
maxIntervalSec: 30
limit: 20
# Check intervals: 1s, 2s, 4s, 8s, 16s, 30s...Lifecycle Handlers
steps:
- command: echo "Processing main task"
handlerOn:
success:
echo "SUCCESS - Workflow completed"
failure:
echo "FAILURE - Cleaning up failed workflow"
exit:
echo "EXIT - Always cleanup"Data & Variables
Environment Variables
env:
- SOME_DIR: ${HOME}/batch
- SOME_FILE: ${SOME_DIR}/some_file
- LOG_LEVEL: debug
- API_KEY: ${SECRET_API_KEY}
steps:
- workingDir: ${SOME_DIR}
command: python main.py ${SOME_FILE}Dotenv Files
# Specify single dotenv file
dotenv: .env
# Load multiple files (all files loaded, later override earlier)
dotenv:
- .env.defaults
- .env.local
- .env.production
steps:
- command: echo "Database: ${DATABASE_URL}"Secrets from Providers
secrets:
- name: API_TOKEN
provider: env
key: PROD_API_TOKEN
- name: DB_PASSWORD
provider: file
key: secrets/db-password
steps:
- command: ./sync.sh
env:
- AUTH_HEADER: "Bearer ${API_TOKEN}"
- STRICT_MODE: "1"Positional Parameters
params: param1 param2 # Default values for $1 and $2
steps:
- command: python main.py $1 $2Named Parameters
params:
- FOO: 1 # Default value for ${FOO}
- BAR: "`echo 2`" # Command substitution in defaults
- ENVIRONMENT: dev
steps:
- command: python main.py ${FOO} ${BAR} --env=${ENVIRONMENT}Output Variables
steps:
- command: echo `date +%Y%m%d`
output: TODAY
- command: echo "Today's date is ${TODAY}"Parallel Outputs Aggregation
steps:
- call: worker
parallel:
items: [east, west, eu]
params: "REGION=${ITEM}"
output: RESULTS
- command: |
echo "Total: ${RESULTS.summary.total}"
echo "First region: ${RESULTS.results[0].params}"
echo "First output: ${RESULTS.outputs[0].value}"
---
name: worker
params:
- REGION: ""
steps:
- command: echo ${REGION}
output: valueSpecial Variables
steps:
- command: |
echo "DAG: ${DAG_NAME}"
echo "Run: ${DAG_RUN_ID}"
echo "Step: ${DAG_RUN_STEP_NAME}"
echo "Log: ${DAG_RUN_LOG_FILE}"Output Size Limits
# Set maximum output size to 5MB for all steps
maxOutputSize: 5242880 # 5MB in bytes
steps:
- command: "cat large-file.txt"
output: CONTENT # Will fail if file exceeds 5MBControl output size limits to prevent memory issues.
Redirect Output to Files
steps:
- command: "echo hello"
stdout: "/tmp/hello"
- command: "echo error message >&2"
stderr: "/tmp/error.txt"JSON Path References
steps:
- call: sub_workflow
output: SUB_RESULT
- command: echo "Result: ${SUB_RESULT.outputs.finalValue}"Step ID References
type: graph
steps:
- id: extract
command: python extract.py
output: DATA
- command: |
echo "Exit code: ${extract.exitCode}"
echo "Stdout path: ${extract.stdout}"
depends: extractCommand Substitution
env:
TODAY: "`date '+%Y%m%d'`"
steps:
- command: echo hello, today is ${TODAY}Scripts & Code
Shell Scripts
steps:
- script: |
#!/bin/bash
cd /tmp
echo "hello world" > hello
cat hello
ls -laRun shell script with default shell.
Shebang Script
steps:
- script: |
#!/usr/bin/env python3
import platform
print(platform.python_version())Runs with the interpreter declared in the shebang.
Python Scripts
steps:
- command: python
script: |
import os
import datetime
print(f"Current directory: {os.getcwd()}")
print(f"Current time: {datetime.datetime.now()}")Execute script with specific interpreter.
Multi-Step Scripts
steps:
- script: |
#!/bin/bash
set -e
echo "Starting process..."
echo "Preparing environment"
echo "Running main task..."
echo "Running main process"
echo "Cleaning up..."
echo "Cleaning up"Working Directory
workingDir: /tmp
steps:
- command: pwd # Outputs: /tmp
- command: mkdir -p data
- workingDir: /tmp/data
command: pwd # Outputs: /tmp/dataShell Selection
shell: ["/bin/bash", "-e"] # Default shell for all steps
steps:
- command: echo hello world | xargs echo
- shell: /bin/zsh # Override for a single step
command: echo "from zsh"Reproducible Env with Nix Shell
Note: Requires nix-shell to be installed separately. Not included in Dagu binary or container.
steps:
- shell: nix-shell
shellPackages: [python3, curl, jq]
command: |
python3 --version
curl --version
jq --versionStep Types & Integrations
Container Workflow
# DAG-level container for all steps
container:
image: python:3.11
env:
- PYTHONPATH=/app
volumes:
- ./src:/app
steps:
- command: pip install -r requirements.txt
- command: pytest tests/
- command: python setup.py buildKeep Container Running
# Use keepContainer at DAG level
container:
image: postgres:16
keepContainer: true
env:
- POSTGRES_PASSWORD=secret
ports:
- "5432:5432"
steps:
- command: postgres -D /var/lib/postgresql/data
- command: pg_isready -U postgres -h localhost
retryPolicy:
limit: 10
intervalSec: 2Step-Level Container
steps:
- name: build
container:
image: node:18
volumes:
- ./src:/app
workingDir: /app
command: npm run buildExec Into Existing Container
# Run commands in an already-running container
container: my-app-container
steps:
- command: php artisan migrate
- command: php artisan cache:clearExec Mode with Overrides
# Override user and working directory
container:
exec: my-app-container
user: root
workingDir: /var/www
env:
- APP_DEBUG=true
steps:
- command: composer install
- command: chown -R www-data:www-data storageMixed Exec and Image Mode
steps:
# Exec into app container
- name: maintenance-mode
container: my-app
command: php artisan down
# Run migration in fresh container
- name: migrate
container:
image: my-app:latest
command: php artisan migrate
# Exec back into app container
- name: restart
container: my-app
command: php artisan upGitHub Actions (Experimental)
secrets:
- name: GITHUB_TOKEN
provider: env
key: GITHUB_TOKEN
workingDir: /tmp/workspace
steps:
- command: actions/checkout@v4
type: gha
params:
repository: dagu-org/dagu
ref: main
token: "${GITHUB_TOKEN}"Remote Commands via SSH
# Configure SSH once for all steps
ssh:
user: deploy
host: production.example.com
key: ~/.ssh/deploy_key
steps:
- command: curl -f localhost:8080/health
- command: systemctl restart myappContainer Volumes: Relative Paths
workingDir: /app/project
container:
image: python:3.11
volumes:
- ./data:/data # Resolves to /app/project/data:/data
- .:/workspace # Resolves to /app/project:/workspace
steps:
- command: python process.pyHTTP Requests
steps:
- command: POST https://api.example.com/webhook
type: http
config:
headers:
Content-Type: application/json
body: '{"status": "started"}'JSON Processing
steps:
# Fetch sample users from a public mock API
- command: GET https://reqres.in/api/users
type: http
config:
silent: true
output: API_RESPONSE
# Extract user emails from the JSON response
- command: '.data[] | .email'
type: jq
script: ${API_RESPONSE}Archive Extraction
workingDir: /tmp/data
steps:
- type: archive
config:
source: dataset.tar.zst
destination: ./dataset
command: extractContainer Startup & Readiness
container:
image: alpine:latest
startup: command # keepalive | entrypoint | command
command: ["sh", "-c", "my-daemon"]
waitFor: healthy # running | healthy
logPattern: "Ready" # Optional regex to wait for
restartPolicy: unless-stopped
steps:
- command: echo "Service is ready"Private Registry Auth
registryAuths:
ghcr.io:
username: ${GITHUB_USER}
password: ${GITHUB_TOKEN}
container:
image: ghcr.io/myorg/private-app:latest
steps:
- command: ./appMulti-Container Workflow
steps:
- name: build
container:
image: node:24
volumes:
- ./src:/app
workingDir: /app
command: npm run build
- name: test
container:
image: node:24
volumes:
- ./src:/app
workingDir: /app
command: npm test
- name: deploy
container:
image: python:3.11
env:
- AWS_DEFAULT_REGION=us-east-1
command: python deploy.pySSH: Advanced Options
ssh:
user: deploy
host: app.example.com
port: 2222
key: ~/.ssh/deploy_key
strictHostKey: true
knownHostFile: ~/.ssh/known_hosts
steps:
- command: systemctl status myappMail
smtp:
host: smtp.gmail.com
port: "587"
username: "${SMTP_USER}"
password: "${SMTP_PASS}"
steps:
- type: mail
config:
to: team@example.com
from: noreply@example.com
subject: "Weekly Report"
message: "Attached."
attachments:
- command: report.txtChat / LLM Request
steps:
- type: chat
llm:
provider: openai
model: gpt-4o
messages:
- role: user
content: "What is 2+2?"
output: ANSWERChat with DAG-Level Config
llm:
provider: openai
model: gpt-4o
system: "You are a helpful assistant."
steps:
- type: chat
messages:
- role: user
content: "Explain ${TOPIC} briefly."Steps inherit LLM config from DAG level.
Multi-turn Conversation
steps:
- type: chat
llm:
provider: openai
model: gpt-4o
messages:
- role: user
content: "What is 2+2?"
- type: chat
llm:
provider: openai
model: gpt-4o
messages:
- role: user
content: "Now multiply that by 3."Steps inherit conversation history from previous steps.
Extended Thinking Mode
steps:
- type: chat
llm:
provider: anthropic
model: claude-sonnet-4-20250514
thinking:
enabled: true
effort: high
messages:
- role: user
content: "Analyze this complex problem..."Enable deeper reasoning for complex tasks.
Scheduling & Automation
Basic Scheduling
schedule: "5 4 * * *" # Run at 04:05 daily
steps:
- command: echo "Running scheduled job"Skip Redundant Runs
schedule: "0 */4 * * *" # Every 4 hours
skipIfSuccessful: true # Skip if already succeeded
steps:
- command: echo "Extracting data"
- command: echo "Transforming data"
- command: echo "Loading data"Queue Management
queue: "batch" # Assign to named queue
maxActiveRuns: 2 # Max concurrent runs
steps:
- command: echo "Processing data"Multiple Schedules
schedule:
- "0 9 * * MON-FRI" # Weekdays 9 AM
- "0 14 * * SAT,SUN" # Weekends 2 PM
steps:
- command: echo "Run on multiple times"Timezone
schedule: "CRON_TZ=America/New_York 0 9 * * *"
steps:
- command: echo "9AM New York"Start/Stop/Restart Windows
schedule:
start: "0 8 * * *" # Start 8 AM
restart: "0 12 * * *" # Restart noon
stop: "0 18 * * *" # Stop 6 PM
restartWaitSec: 60
steps:
- command: echo "Long-running service"Global Queue Configuration
# Global queue config in ~/.config/dagu/config.yaml
queues:
enabled: true
config:
- name: "critical"
maxConcurrency: 5
- name: "batch"
maxConcurrency: 1
# DAG file
queue: "critical"
maxActiveRuns: 3
steps:
- command: echo "Processing critical task"Configure queues globally and per-DAG.
Email Notifications
mailOn:
failure: true
success: true
smtp:
host: smtp.gmail.com
port: "587"
username: "${SMTP_USER}"
password: "${SMTP_PASS}"
steps:
- command: echo "Running critical job"
mailOnError: trueOperations & Production
History Retention
histRetentionDays: 30 # Keep 30 days of history
schedule: "0 0 * * *" # Daily at midnight
steps:
- command: echo "Archiving old data"
- command: rm -rf /tmp/archive/*Control how long execution history is retained.
Output Size Management
maxOutputSize: 10485760 # 10MB max output per step
steps:
- command: echo "Analyzing logs"
stdout: /logs/analysis.out
- command: tail -n 1000 /logs/analysis.outCustom Log Directory
logDir: /data/etl/logs/${DAG_NAME}
histRetentionDays: 90
steps:
- command: echo "Extracting data"
stdout: extract.log
stderr: extract.err
- command: echo "Transforming data"
stdout: transform.logOrganize logs in custom directories with retention.
Timeout & Cleanup
timeoutSec: 7200 # 2 hour timeout
maxCleanUpTimeSec: 600 # 10 min cleanup window
steps:
- command: sleep 5 && echo "Processing data"
signalOnStop: SIGTERM
handlerOn:
exit:
command: echo "Cleaning up resources"Production Monitoring
histRetentionDays: 365 # Keep 1 year for compliance
maxOutputSize: 5242880 # 5MB output limit
maxActiveRuns: 1 # No overlapping runs
mailOn:
failure: true
errorMail:
from: alerts@company.com
to: oncall@company.com
prefix: "[CRITICAL]"
attachLogs: true
infoMail:
from: notifications@company.com
to: team@company.com
prefix: "[SUCCESS]"
handlerOn:
failure:
command: |
curl -X POST https://metrics.company.com/alerts \
-H "Content-Type: application/json" \
-d '{"service": "critical-service", "status": "failed"}'
steps:
- command: echo "Checking health"
retryPolicy:
limit: 3
intervalSec: 30Distributed Tracing
otel:
enabled: true
endpoint: "otel-collector:4317"
resource:
service.name: "dagu-${DAG_NAME}"
deployment.environment: "${ENV}"
steps:
- command: echo "Fetching data"
- command: python process.py
- call: pipelines/transformEnable OpenTelemetry tracing for observability.
Execution Control
type: graph
maxActiveSteps: 5 # Max 5 parallel steps
maxActiveRuns: 2 # Max 2 concurrent DAG runs
delaySec: 10 # 10 second initial delay
skipIfSuccessful: true # Skip if already succeeded
steps:
- name: validate
command: echo "Validating configuration"
- name: process-batch-1
command: echo "Processing batch 1"
depends: validate
- name: process-batch-2
command: echo "Processing batch 2"
depends: validate
- name: process-batch-3
command: echo "Processing batch 3"
depends: validateQueuing
queue: compute-queue # Assign to specific queue
steps:
- command: echo "Preparing data"
- command: echo "Running intensive computation"
- command: echo "Storing results"Limit History Retention
histRetentionDays: 60 # Keep 60 days history
steps:
- command: echo "Running periodic maintenance"Lock Down Run Inputs
runConfig:
disableParamEdit: true # Prevent editing params at start
disableRunIdEdit: true # Prevent custom run IDs
params:
- ENVIRONMENT: production
- VERSION: 1.0.0Complete DAG Configuration
description: Daily ETL pipeline for analytics
schedule: "0 2 * * *"
skipIfSuccessful: true
group: DataPipelines
tags: daily,critical
queue: etl-queue
maxActiveRuns: 1
maxOutputSize: 5242880 # 5MB
histRetentionDays: 90 # Keep history for 90 days
env:
- LOG_LEVEL: info
- DATA_DIR: /data/analytics
params:
- DATE: "`date '+%Y-%m-%d'`"
- ENVIRONMENT: production
mailOn:
failure: true
smtp:
host: smtp.company.com
port: "587"
handlerOn:
success:
command: echo "ETL completed successfully"
failure:
command: echo "Cleaning up after failure"
exit:
command: echo "Final cleanup"
steps:
- name: validate-environment
command: echo "Validating environment: ${ENVIRONMENT}"