[Feature]Add Python task "task variable / result transfer" implementation#3659
[Feature]Add Python task "task variable / result transfer" implementation#3659EricJoy2048 merged 26 commits intoapache:devfrom guyinyou:dev
Conversation
Signed-off-by: 古崟佑
|
please change title to english . thx |
Signed-off-by: 古崟佑
Signed-off-by: 1941815847Cy4 <1941815847cy4@kuaishou.com>
Signed-off-by: 古崟佑
|
e2e failure, Please rebase the dev branch code again |
|
I suggest create an issue first and then relation to this pr ,so that you can discuss this feature with others. |
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Codecov Report
@@ Coverage Diff @@
## dev #3659 +/- ##
============================================
+ Coverage 39.14% 39.78% +0.63%
- Complexity 2829 2900 +71
============================================
Files 456 459 +3
Lines 21617 21762 +145
Branches 2622 2647 +25
============================================
+ Hits 8462 8658 +196
+ Misses 12365 12289 -76
- Partials 790 815 +25
Continue to review full report at Codecov.
|
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
...cheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/VarPoolUtilsTest.java
Show resolved
Hide resolved
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
...cheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/VarPoolUtilsTest.java
Show resolved
Hide resolved
...er/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
Outdated
Show resolved
Hide resolved
...hinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
Outdated
Show resolved
Hide resolved
...cheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/VarPoolUtilsTest.java
Show resolved
Hide resolved
Signed-off-by: 古崟佑
...r-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
Outdated
Show resolved
Hide resolved
Signed-off-by: 古崟佑
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
Show resolved
Hide resolved
|
+1 |
|
+1 |
EricJoy2048
left a comment
There was a problem hiding this comment.
VarPoolUtils.convertVarPoolToMap(propToValue, processInstance.getVarPool());
} catch (ParseException e) {
logger.error("parse {} exception", processInstance.getVarPool(), e);
}
When the processInstance.getVarPool() convert to map exception , we do nothing?
Should this task be marked as failed?
...-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
Show resolved
Hide resolved
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
Show resolved
Hide resolved
|
Kudos, SonarCloud Quality Gate passed!
|
| throw new RuntimeException(); | ||
| } | ||
| TaskNode taskNodeObject = dag.getNode(taskNode); | ||
| VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue); |
There was a problem hiding this comment.
there is bug will cause the origin param be replace by the task param,so the task will get nothing when use taskExecutionContext.getTaskParams()
|
why I use 1.3.8 release version can not do it ? |

Content of PR:
Added VarPool:
Introduction: VarPool is similar to a global variable whose value can be dynamically modified. Variables are passed between different nodes in the task flow.
Difference: Global variables are constant attributes of the task flow, and currently there is no interface that can be dynamically modified.
Precautions:
Need to add the field var_pool in the t_ds_task_instance and t_ds_process_instance tables in the DB, the type is longtext.
How to use:(Currently only supports Python nodes)
Value: Just declare the variable name at LocalParams of the node.
Modification: ${setShareVar(variable name, value)} in the script.
Example:
———————————————————————————————————————————————————
PR的内容:
新增VarPool:
作用:VarPool类似于一个值可动态修改的全局变量,在任务流下不同节点之间传递变量。
区别:全局变量为任务流的常量属性,目前没有接口可以动态修改。
注意事项:
需要在DB中的t_ds_task_instance和t_ds_process_instance表中增加字段var_pool,类型为longtext。
使用:(目前仅支持Python节点)
取值:只需在节点的LocalParams处申明变量名。
修改:在脚本中 ${setShareVar(变量名,值)}。
示例:
————————————————————————————————————————————————————




Node a1:
————————————————————————————————————————————————————
Node a2:
————————————————————————————————————————————————————
result:
————————————————————————————————————————————————————
Process: