1515# specific language governing permissions and limitations
1616# under the License.
1717
18- """DolphinScheduler ObjectJsonBase, TaskParams and Task object."""
18+ """DolphinScheduler Task and TaskRelation object."""
1919
2020import logging
2121from typing import Dict , List , Optional , Sequence , Set , Tuple , Union
3232 ProcessDefinitionContext ,
3333)
3434from pydolphinscheduler .java_gateway import launch_gateway
35- from pydolphinscheduler .utils .string import class_name2camel , snake2camel
3635
3736
38- class ObjectJsonBase :
39- """Task base class, define `__str__` and `to_dict` function would be use in other task related class."""
40-
41- DEFAULT_ATTR = {}
42-
43- def __int__ (self , * args , ** kwargs ):
44- pass
45-
46- def __str__ (self ) -> str :
47- content = []
48- for attribute , value in self .__dict__ .items ():
49- content .append (f'"{ snake2camel (attribute )} ": { value } ' )
50- content = "," .join (content )
51- return f'"{ class_name2camel (type (self ).__name__ )} ":{{{ content } }}'
52-
53- # TODO check how Redash do
54- # TODO DRY
55- def to_dict (self ) -> Dict :
56- """Get object key attribute dict which determine by attribute `DEFAULT_ATTR`."""
57- content = {snake2camel (attr ): value for attr , value in self .__dict__ .items ()}
58- content .update (self .DEFAULT_ATTR )
59- return content
60-
61-
62- class TaskParams (ObjectJsonBase ):
63- """TaskParams object, describe the key parameter of a single task."""
64-
65- DEFAULT_CONDITION_RESULT = {"successNode" : ["" ], "failedNode" : ["" ]}
66-
67- def __init__ (
68- self ,
69- local_params : Optional [List ] = None ,
70- resource_list : Optional [List ] = None ,
71- dependence : Optional [Dict ] = None ,
72- wait_start_timeout : Optional [Dict ] = None ,
73- condition_result : Optional [Dict ] = None ,
74- * args ,
75- ** kwargs ,
76- ):
77- super ().__init__ (* args , ** kwargs )
78- self .local_params = local_params or []
79- self .resource_list = resource_list or []
80- self .dependence = dependence or {}
81- self .wait_start_timeout = wait_start_timeout or {}
82- # TODO need better way to handle it, this code just for POC
83- self .condition_result = condition_result or self .DEFAULT_CONDITION_RESULT
84-
85-
86- class TaskRelation (ObjectJsonBase ):
37+ class TaskRelation (Base ):
8738 """TaskRelation object, describe the relation of exactly two tasks."""
8839
89- DEFAULT_ATTR = {
40+ _DEFINE_ATTR = {
41+ "pre_task_code" ,
42+ "post_task_code" ,
43+ }
44+
45+ _DEFAULT_ATTR = {
9046 "name" : "" ,
9147 "preTaskVersion" : 1 ,
9248 "postTaskVersion" : 1 ,
@@ -98,8 +54,9 @@ def __init__(
9854 self ,
9955 pre_task_code : int ,
10056 post_task_code : int ,
57+ name : Optional [str ] = None ,
10158 ):
102- super ().__init__ ()
59+ super ().__init__ (name )
10360 self .pre_task_code = pre_task_code
10461 self .post_task_code = post_task_code
10562
@@ -110,19 +67,32 @@ def __hash__(self):
11067class Task (Base ):
11168 """Task object, parent class for all exactly task type."""
11269
113- DEFAULT_DEPS_ATTR = {
114- "name" : "" ,
115- "preTaskVersion" : 1 ,
116- "postTaskVersion" : 1 ,
117- "conditionType" : 0 ,
118- "conditionParams" : {},
70+ _DEFINE_ATTR = {
71+ "name" ,
72+ "code" ,
73+ "version" ,
74+ "task_type" ,
75+ "task_params" ,
76+ "description" ,
77+ "flag" ,
78+ "task_priority" ,
79+ "worker_group" ,
80+ "delay_time" ,
81+ "fail_retry_times" ,
82+ "fail_retry_interval" ,
83+ "timeout_flag" ,
84+ "timeout_notify_strategy" ,
85+ "timeout" ,
11986 }
12087
88+ _task_custom_attr : set = set ()
89+
90+ DEFAULT_CONDITION_RESULT = {"successNode" : ["" ], "failedNode" : ["" ]}
91+
12192 def __init__ (
12293 self ,
12394 name : str ,
12495 task_type : str ,
125- task_params : TaskParams ,
12696 description : Optional [str ] = None ,
12797 flag : Optional [str ] = TaskFlag .YES ,
12898 task_priority : Optional [str ] = TaskPriority .MEDIUM ,
@@ -134,11 +104,15 @@ def __init__(
134104 timeout_notify_strategy : Optional = None ,
135105 timeout : Optional [int ] = 0 ,
136106 process_definition : Optional [ProcessDefinition ] = None ,
107+ local_params : Optional [List ] = None ,
108+ resource_list : Optional [List ] = None ,
109+ dependence : Optional [Dict ] = None ,
110+ wait_start_timeout : Optional [Dict ] = None ,
111+ condition_result : Optional [Dict ] = None ,
137112 ):
138113
139114 super ().__init__ (name , description )
140115 self .task_type = task_type
141- self .task_params = task_params
142116 self .flag = flag
143117 self .task_priority = task_priority
144118 self .worker_group = worker_group
@@ -169,6 +143,13 @@ def __init__(
169143 self .code ,
170144 )
171145
146+ # Attribute for task param
147+ self .local_params = local_params or []
148+ self .resource_list = resource_list or []
149+ self .dependence = dependence or {}
150+ self .wait_start_timeout = wait_start_timeout or {}
151+ self .condition_result = condition_result or self .DEFAULT_CONDITION_RESULT
152+
172153 @property
173154 def process_definition (self ) -> Optional [ProcessDefinition ]:
174155 """Get attribute process_definition."""
@@ -179,6 +160,22 @@ def process_definition(self, process_definition: Optional[ProcessDefinition]):
179160 """Set attribute process_definition."""
180161 self ._process_definition = process_definition
181162
163+ @property
164+ def task_params (self ) -> Optional [Dict ]:
165+ """Get task parameter object.
166+
167+ Will get result to combine _task_custom_attr and custom_attr.
168+ """
169+ custom_attr = {
170+ "local_params" ,
171+ "resource_list" ,
172+ "dependence" ,
173+ "wait_start_timeout" ,
174+ "condition_result" ,
175+ }
176+ custom_attr |= self ._task_custom_attr
177+ return self .get_define_custom (custom_attr = custom_attr )
178+
182179 def __hash__ (self ):
183180 return hash (self .code )
184181
@@ -259,16 +256,3 @@ def gen_code_and_version(self) -> Tuple:
259256 # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
260257 # gateway_result_checker(result)
261258 return result .get ("code" ), result .get ("version" )
262-
263- def to_dict (self , camel_attr = True ) -> Dict :
264- """Task `to_dict` function which will return key attribute for Task object."""
265- content = {}
266- for attr , value in self .__dict__ .items ():
267- # Don't publish private variables
268- if attr .startswith ("_" ):
269- continue
270- elif isinstance (value , TaskParams ):
271- content [snake2camel (attr )] = value .to_dict ()
272- else :
273- content [snake2camel (attr )] = value
274- return content
0 commit comments