-
Notifications
You must be signed in to change notification settings - Fork 5k
[DSIP-11][Feature][python] Use the pydolphinscheduler to dynamically generate workflows from the configuration file #10995
Description
Search before asking
- I had searched in the issues and found no similar feature requirement.
Description
Dynamically generate workflows from YAML configuration files, aka workflows-as-code.
Now we can use python-dolphinscheduler to build workflow easily. Workflows-as-code would be easier to use if we supported configuration file definition workflows. It will make the workflow clearer, shareable, and reviewable.
Then we can upload our project as a git repository. This can help us do richer operations, such as CICD, etc
We can create a workload by defining the following fields, all the field definitions are derived from PydolphinScheduler.
A YAML file defines a workflow :
# Define the process
Process:
#process(workflow) name
name: prepare_datas
# Parameters for process creation
param:
project: /data/project
# Whether to run the workflow after the creation is complete
run: True
# Define the tasks under the process
Tasks:
-
# task type: Shell, python, SubProcess, Spark, etc
TaskType: Shell
# Upstream Task List
dependencies: [xxxx]
# Parameters for task creation
params:
name: task1
other parameters: ...
-
TaskType: Python
params:
name: task2
- ...
- ... Here is a simple example to show how to use YAML to manage workflows(A YAML file defines a workflow):
# Define the process
Process:
name: prepare_datas
param:
project: /data/project
# Define the tasks under the process
Tasks:
-
TaskType: Shell
params:
name: download_data
command: |
export PYTHONPATH=${project}
source ${project}/env/bin/activate
data_path=${project}/data/daily
python -m dmsa.data.download ${data_path}
local_params:
- { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}
-
TaskType: Shell
dependencies: [download_data]
params:
name: calc_signals
command: |
export PYTHONPATH=${project}
source ${project}/env/bin/activate
data_path=${project}/data/daily
python -m dmsa.data_processing.calc_signals \
--data_path ${data_path} \
--name_file ${project}/feature_signal.txt
local_params:
- { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}
-
TaskType: Shell
dependencies: [download_data]
params:
name: calc_features
command: |
export PYTHONPATH=${project}
source ${project}/env/bin/activate
data_path=${project}/data/daily
python -m dmsa.data_processing.calc_features \
--data_path $data_path \
--name_file ${project}/feature_signal.txt
local_params:
- { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}
Alternatively, we can use some of the methods native to YAML files for easier definition,for example, using & and * :
# User-defined parameters. The parameter suggestions are all written here
Params:
process_name: &process_name prepare_datas
project: &project "/data/project"
# The variable definitions in the YAML file are used for the following configuration
Varible:
local_params: &local_params { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}
# Define the process
Process:
name: *process_name
param:
project: *project
# Define the tasks under the process
Tasks:
-
TaskType: Shell
params:
name: download_data
command: |
export PYTHONPATH=${project}
source ${project}/env/bin/activate
data_path=${project}/data/daily
python -m dmsa.data.download ${data_path}
local_params:
- *local_params
-
TaskType: Shell
dependencies: [download_data]
params:
name: calc_signals
command: |
export PYTHONPATH=${project}
source ${project}/env/bin/activate
data_path=${project}/data/daily
python -m dmsa.data_processing.calc_signals \
--data_path ${data_path} \
--name_file ${project}/feature_signal.txt
local_params:
- *local_params
-
TaskType: Shell
dependencies: [download_data]
params:
name: calc_features
command: |
export PYTHONPATH=${project}
source ${project}/env/bin/activate
data_path=${project}/data/daily
python -m dmsa.data_processing.calc_features \
--data_path $data_path \
--name_file ${project}/feature_signal.txt
local_params:
- *local_params
A richer approach is to combine the DS features, and we can add some magic methods to make it easier to use. For example, we can read environment variables $Env{xxxx}, and we can read the contents of files $File{xxxx}:
# User-defined parameters. The parameter suggestions are all written here
Params:
process_name: &process_name prepare_datas
project: &project $Env{STOCK_PROJECT}
# The variable definitions in the YAML file are used for the following configuration
Varible:
local_params: &local_params { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}
# Define the process
Process:
name: *process_name
param:
project: *project
# Define the tasks under the process
Tasks:
-
TaskType: Shell
params:
name: download_data
command: $File("download_data.sh")
local_params:
- *local_params
-
TaskType: Shell
dependencies: [download_data]
params:
name: calc_signals
command: $File("calc_signals")
local_params:
- *local_params
-
TaskType: Shell
dependencies: [download_data]
params:
name: calc_features
command: $File("calc_features")
local_params:
- *local_params
Once we have defined the configuration file, we can use the CLI of the PydolphinScheduler to load the workflow
Use case
No response
Related issues
No response
Are you willing to submit a PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct