Skip to content

[Feature]Add Python task "task variable / result transfer" implementation#3659

Merged
EricJoy2048 merged 26 commits intoapache:devfrom
guyinyou:dev
Sep 25, 2020
Merged

[Feature]Add Python task "task variable / result transfer" implementation#3659
EricJoy2048 merged 26 commits intoapache:devfrom
guyinyou:dev

Conversation

@guyinyou
Copy link
Copy Markdown
Contributor

@guyinyou guyinyou commented Sep 3, 2020

Content of PR:
Added VarPool:
Introduction: VarPool is similar to a global variable whose value can be dynamically modified. Variables are passed between different nodes in the task flow.
Difference: Global variables are constant attributes of the task flow, and currently there is no interface that can be dynamically modified.

Precautions:
Need to add the field var_pool in the t_ds_task_instance and t_ds_process_instance tables in the DB, the type is longtext.

How to use:(Currently only supports Python nodes)
Value: Just declare the variable name at LocalParams of the node.
Modification: ${setShareVar(variable name, value)} in the script.

Example:

Node a1:
	Script:
		value = 600 + 60
		${setShareVar(${shareVar1},value+6)}
		
Node a2:
	Custom parameters:
		shareVar1:"default"
	Script:
		print("shareVar1:",${shareVar1})
	operation result:
		"shareVar1:666"

———————————————————————————————————————————————————
PR的内容:
新增VarPool:
作用:VarPool类似于一个值可动态修改的全局变量,在任务流下不同节点之间传递变量。
区别:全局变量为任务流的常量属性,目前没有接口可以动态修改。

注意事项:
需要在DB中的t_ds_task_instance和t_ds_process_instance表中增加字段var_pool,类型为longtext。

使用:(目前仅支持Python节点)
取值:只需在节点的LocalParams处申明变量名。
修改:在脚本中 ${setShareVar(变量名,值)}。

示例:

节点a1:
	脚本:
		value = 600 + 60
		${setShareVar(${shareVar1},value+6)}

节点a2:
	自定义参数:
		shareVar1:"default"
	脚本:
		print("shareVar1:",${shareVar1})
	运行结果:
		shareVar1:666

————————————————————————————————————————————————————
Node a1:
0
————————————————————————————————————————————————————
Node a2:
1
————————————————————————————————————————————————————
result:
2
————————————————————————————————————————————————————
Process:
Process

@qiaozhanwei
Copy link
Copy Markdown
Contributor

please change title to english . thx

1941815847Cy4 added 3 commits September 7, 2020 21:44
Signed-off-by: 古崟佑
Signed-off-by: 1941815847Cy4 <1941815847cy4@kuaishou.com>
Signed-off-by: 古崟佑
@guyinyou guyinyou changed the title 增加Python Task的“任务变量/结果传递”实现 [Feature]Add Python task "task variable / result transfer" implementation Sep 9, 2020
Signed-off-by: 古崟佑
@xingchun-chen
Copy link
Copy Markdown
Contributor

e2e failure, Please rebase the dev branch code again

@simon824
Copy link
Copy Markdown
Member

I suggest create an issue first and then relation to this pr ,so that you can discuss this feature with others.

1941815847Cy4 added 3 commits September 11, 2020 01:09
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Sep 11, 2020

Codecov Report

Merging #3659 into dev will increase coverage by 0.63%.
The diff coverage is 12.68%.

Impacted file tree graph

@@             Coverage Diff              @@
##                dev    #3659      +/-   ##
============================================
+ Coverage     39.14%   39.78%   +0.63%     
- Complexity     2829     2900      +71     
============================================
  Files           456      459       +3     
  Lines         21617    21762     +145     
  Branches       2622     2647      +25     
============================================
+ Hits           8462     8658     +196     
+ Misses        12365    12289      -76     
- Partials        790      815      +25     
Impacted Files Coverage Δ Complexity Δ
...pache/dolphinscheduler/common/task/TaskParams.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...he/dolphinscheduler/common/utils/VarPoolUtils.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...ler/remote/command/TaskExecuteResponseCommand.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...er/master/processor/queue/TaskResponseService.java 32.69% <0.00%> (-24.18%) 5.00 <0.00> (ø)
...heduler/server/master/runner/MasterExecThread.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...lphinscheduler/service/process/ProcessService.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
...cheduler/server/worker/task/python/PythonTask.java 10.81% <8.33%> (+10.81%) 1.00 <1.00> (+1.00)
...er/server/worker/task/AbstractCommandExecutor.java 4.61% <14.28%> (+0.40%) 2.00 <1.00> (ø)
...phinscheduler/server/worker/task/AbstractTask.java 11.53% <33.33%> (+0.87%) 4.00 <1.00> (+1.00)
...rver/master/processor/queue/TaskResponseEvent.java 50.90% <75.00%> (-7.92%) 10.00 <1.00> (-4.00)
... and 46 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 69201ac...d4318d1. Read the comment docs.

1941815847Cy4 added 9 commits September 11, 2020 17:06
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
1941815847Cy4 added 3 commits September 14, 2020 18:56
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
fix
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
fix
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
Signed-off-by: 古崟佑
@simon824
Copy link
Copy Markdown
Member

+1

@davidzollo davidzollo requested a review from lenboo September 21, 2020 14:58
@zixi0825
Copy link
Copy Markdown
Member

+1

Copy link
Copy Markdown
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VarPoolUtils.convertVarPoolToMap(propToValue, processInstance.getVarPool());
} catch (ParseException e) {
logger.error("parse {} exception", processInstance.getVarPool(), e);

}
When the processInstance.getVarPool() convert to map exception , we do nothing?
Should this task be marked as failed?

fix
Signed-off-by: 古崟佑
@sonarqubecloud
Copy link
Copy Markdown

Kudos, SonarCloud Quality Gate passed!

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities (and Security Hotspot 0 Security Hotspots to review)
Code Smell A 9 Code Smells

66.2% 66.2% Coverage
0.0% 0.0% Duplication

Copy link
Copy Markdown
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

throw new RuntimeException();
}
TaskNode taskNodeObject = dag.getNode(taskNode);
VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is bug will cause the origin param be replace by the task param,so the task will get nothing when use taskExecutionContext.getTaskParams()

@luojieio
Copy link
Copy Markdown

why I use 1.3.8 release version can not do it ?

@luojieio
Copy link
Copy Markdown

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants