2222from pydolphinscheduler .constants import TaskType
2323from pydolphinscheduler .core .process_definition import ProcessDefinitionContext
2424from pydolphinscheduler .core .task import Task , TaskParams
25+ from pydolphinscheduler .exceptions import PyDSProcessDefinitionNotAssignException
2526from pydolphinscheduler .java_gateway import launch_gateway
2627
2728
@@ -38,7 +39,16 @@ class SubProcess(Task):
3839
3940 def __init__ (self , name : str , process_definition_name : str , * args , ** kwargs ):
4041 self ._process_definition_name = process_definition_name
41- self ._process_definition = {}
42+ self ._process_definition_info = {}
43+ # TODO: Optimize the way of obtaining process_definition
44+ self .process_definition = kwargs .get (
45+ "process_definition" , ProcessDefinitionContext .get ()
46+ )
47+ if not self .process_definition :
48+ raise PyDSProcessDefinitionNotAssignException (
49+ "ProcessDefinition must be provider when SubProcess initialization."
50+ )
51+
4252 task_params = SubProcessTaskParams (
4353 process_definition_code = self .get_process_definition_code (),
4454 )
@@ -52,14 +62,15 @@ def get_process_definition_code(self) -> str:
5262
5363 def get_process_definition_info (self , process_definition_name : str ) -> Dict :
5464 """Get process definition info from java gateway, contains process definition id, name, code."""
55- if self ._process_definition :
56- return self ._process_definition
65+ if self ._process_definition_info :
66+ return self ._process_definition_info
5767 else :
5868 gateway = launch_gateway ()
59- process_definition = ProcessDefinitionContext .get ()
60- self ._process_definition = gateway .entry_point .getProcessDefinitionInfo (
61- process_definition .user .name ,
62- process_definition .project .name ,
63- process_definition_name ,
69+ self ._process_definition_info = (
70+ gateway .entry_point .getProcessDefinitionInfo (
71+ self .process_definition .user .name ,
72+ self .process_definition .project .name ,
73+ process_definition_name ,
74+ )
6475 )
65- return self ._process_definition
76+ return self ._process_definition_info
0 commit comments