1515# specific language governing permissions and limitations
1616# under the License.
1717
18- from typing import Optional , List , Dict , Set , Union , Sequence
18+ from typing import Optional , List , Dict , Set , Union , Sequence , Tuple
1919
2020from pydolphinscheduler .constants import TaskPriority , ProcessDefinitionDefault , TaskFlag , TaskTimeoutFlag , \
2121 DefaultTaskCodeNum , JavaGatewayDefault
@@ -99,9 +99,6 @@ def __hash__(self):
9999
100100
101101class Task (Base ):
102- DEFAULT_INS_ATTR = {
103- "version" : 1 ,
104- }
105102
106103 DEFAULT_DEPS_ATTR = {
107104 "name" : "" ,
@@ -130,7 +127,6 @@ def __init__(
130127 ):
131128
132129 super ().__init__ (name , description )
133- self .code : int = self .gen_code ()
134130 self .task_type = task_type
135131 self .task_params = task_params
136132 self .flag = flag
@@ -147,6 +143,11 @@ def __init__(
147143 self ._upstream_task_codes : Set [int ] = set ()
148144 self ._downstream_task_codes : Set [int ] = set ()
149145 self ._task_relation : Set [TaskRelation ] = set ()
146+ # move attribute code and version after _process_definition and process_definition declare
147+ self .code , self .version = self .gen_code_and_version ()
148+ # Add task to process definition, maybe we could put into property process_definition latter
149+ if self .process_definition is not None and self .code not in self .process_definition .tasks :
150+ self .process_definition .add_task (self )
150151
151152 @property
152153 def process_definition (self ) -> Optional [ProcessDefinition ]:
@@ -157,8 +158,6 @@ def process_definition(self) -> Optional[ProcessDefinition]:
157158
158159 @process_definition .setter
159160 def process_definition (self , process_definition : Optional [ProcessDefinition ]):
160- if process_definition is not None and self .code not in process_definition .tasks :
161- process_definition .add_task (self )
162161 self ._process_definition = process_definition
163162
164163 def __hash__ (self ):
@@ -217,12 +216,13 @@ def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
217216 self ._set_deps (tasks , upstream = False )
218217
219218 # TODO code should better generate in bulk mode when :ref: processDefinition run submit or start
220- @ staticmethod
221- def gen_code () -> int :
219+ def gen_code_and_version ( self ) -> Tuple :
220+ # TODO get code from specific project process definition and task name
222221 gateway = launch_gateway ()
223- result = gateway .entry_point .genTaskCodeList (DefaultTaskCodeNum .DEFAULT )
224- gateway_result_checker (result )
225- return result [JavaGatewayDefault .RESULT_DATA ][0 ]
222+ result = gateway .entry_point .getCodeAndVersion (self .process_definition ._project , self .name )
223+ # result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
224+ # gateway_result_checker(result)
225+ return result .get ("code" ), result .get ("version" )
226226
227227 def to_dict (self , camel_attr = True ) -> Dict :
228228 content = {}
@@ -234,5 +234,4 @@ def to_dict(self, camel_attr=True) -> Dict:
234234 content [snake2camel (attr )] = value .to_dict ()
235235 else :
236236 content [snake2camel (attr )] = value
237- content .update (self .DEFAULT_INS_ATTR )
238237 return content
0 commit comments