Skip to content

[DSIP-11][Feature][python] Use the pydolphinscheduler to dynamically generate workflows from the configuration file #10995

@jieguangzhou

Description

@jieguangzhou

Search before asking

  • I had searched in the issues and found no similar feature requirement.

Description

Dynamically generate workflows from YAML configuration files, aka workflows-as-code.

Now we can use python-dolphinscheduler to build workflow easily. Workflows-as-code would be easier to use if we supported configuration file definition workflows. It will make the workflow clearer, shareable, and reviewable.

Then we can upload our project as a git repository. This can help us do richer operations, such as CICD, etc

We can create a workload by defining the following fields, all the field definitions are derived from PydolphinScheduler.

A YAML file defines a workflow :

# Define the process
Process:
  #process(workflow) name
  name: prepare_datas

  # Parameters for process creation
  param:
      project: /data/project

  # Whether to run the workflow after the creation is complete
  run: True

# Define the tasks under the process
Tasks:
  -
    # task type: Shell, python, SubProcess, Spark, etc
    TaskType: Shell

    # Upstream Task List
    dependencies: [xxxx]

    # Parameters for task creation
    params:
      name: task1
      other parameters: ...

  -  
    TaskType: Python
    params:
      name: task2
  - ...
  - ... 

Here is a simple example to show how to use YAML to manage workflows(A YAML file defines a workflow):

# Define the process
Process:
  name: prepare_datas
  param:
      project: /data/project

# Define the tasks under the process
Tasks:
  - 
    TaskType: Shell
    params:
      name: download_data
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data.download ${data_path}
      local_params: 
        - { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}

  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_signals 
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data_processing.calc_signals \
                      --data_path ${data_path} \
                      --name_file ${project}/feature_signal.txt 
      local_params: 
        - { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}


  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_features 
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data_processing.calc_features \
                    --data_path $data_path \
                    --name_file ${project}/feature_signal.txt
      local_params:
        - { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}

Alternatively, we can use some of the methods native to YAML files for easier definition,for example, using & and * :

# User-defined parameters. The parameter suggestions are all written here
Params:
  process_name: &process_name prepare_datas
  project: &project "/data/project"

# The variable definitions in the YAML file are used for the following configuration
Varible:
  local_params: &local_params { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}

# Define the process
Process:
  name: *process_name
  param:
      project: *project

# Define the tasks under the process
Tasks:
  - 
    TaskType: Shell
    params:
      name: download_data
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data.download ${data_path}
      local_params: 
        - *local_params 

  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_signals 
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data_processing.calc_signals \
                      --data_path ${data_path} \
                      --name_file ${project}/feature_signal.txt 
      local_params: 
        - *local_params 


  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_features 
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data_processing.calc_features \
                    --data_path $data_path \
                    --name_file ${project}/feature_signal.txt
      local_params:
        - *local_params 

A richer approach is to combine the DS features, and we can add some magic methods to make it easier to use. For example, we can read environment variables $Env{xxxx}, and we can read the contents of files $File{xxxx}:

# User-defined parameters. The parameter suggestions are all written here
Params:
  process_name: &process_name prepare_datas
  project: &project $Env{STOCK_PROJECT}

# The variable definitions in the YAML file are used for the following configuration
Varible:
  local_params: &local_params { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}

# Define the process
Process:
  name: *process_name
  param:
      project: *project

# Define the tasks under the process
Tasks:
  - 
    TaskType: Shell
    params:
      name: download_data
      command: $File("download_data.sh")
      local_params: 
        - *local_params 

  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_signals 
      command: $File("calc_signals")
      local_params: 
        - *local_params 


  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_features 
      command: $File("calc_features")
      local_params:
        - *local_params 

Once we have defined the configuration file, we can use the CLI of the PydolphinScheduler to load the workflow

Use case

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions