Skip to content

Commit e54457f

Browse files
committed
change process_definition usage of Subprocess task
1 parent b514cc1 commit e54457f

3 files changed

Lines changed: 37 additions & 15 deletions

File tree

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,7 @@ class PyDSJavaGatewayException(PyDSBaseException):
4040
"""Exception for pydolphinscheduler Java gateway error."""
4141

4242
pass
43+
44+
45+
class PyDSProcessDefinitionNotAssignException(PyDSBaseException):
46+
"""Exception for pydolphinscheduler process definition not assign error."""

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sub_process.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from pydolphinscheduler.constants import TaskType
2323
from pydolphinscheduler.core.process_definition import ProcessDefinitionContext
2424
from pydolphinscheduler.core.task import Task, TaskParams
25+
from pydolphinscheduler.exceptions import PyDSProcessDefinitionNotAssignException
2526
from pydolphinscheduler.java_gateway import launch_gateway
2627

2728

@@ -38,7 +39,16 @@ class SubProcess(Task):
3839

3940
def __init__(self, name: str, process_definition_name: str, *args, **kwargs):
4041
self._process_definition_name = process_definition_name
41-
self._process_definition = {}
42+
self._process_definition_info = {}
43+
# TODO: Optimize the way of obtaining process_definition
44+
self.process_definition = kwargs.get(
45+
"process_definition", ProcessDefinitionContext.get()
46+
)
47+
if not self.process_definition:
48+
raise PyDSProcessDefinitionNotAssignException(
49+
"ProcessDefinition must be provider when SubProcess initialization."
50+
)
51+
4252
task_params = SubProcessTaskParams(
4353
process_definition_code=self.get_process_definition_code(),
4454
)
@@ -52,14 +62,15 @@ def get_process_definition_code(self) -> str:
5262

5363
def get_process_definition_info(self, process_definition_name: str) -> Dict:
5464
"""Get process definition info from java gateway, contains process definition id, name, code."""
55-
if self._process_definition:
56-
return self._process_definition
65+
if self._process_definition_info:
66+
return self._process_definition_info
5767
else:
5868
gateway = launch_gateway()
59-
process_definition = ProcessDefinitionContext.get()
60-
self._process_definition = gateway.entry_point.getProcessDefinitionInfo(
61-
process_definition.user.name,
62-
process_definition.project.name,
63-
process_definition_name,
69+
self._process_definition_info = (
70+
gateway.entry_point.getProcessDefinitionInfo(
71+
self.process_definition.user.name,
72+
self.process_definition.project.name,
73+
process_definition_name,
74+
)
6475
)
65-
return self._process_definition
76+
return self._process_definition_info

dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_sub_process.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@
2222

2323
import pytest
2424

25+
from pydolphinscheduler.core.process_definition import ProcessDefinition
2526
from pydolphinscheduler.tasks.sub_process import SubProcess, SubProcessTaskParams
2627

27-
PROCESS_DEFINITION_NAME = "test-process-definition"
28-
PROCESS_DEFINITION_CODE = "3643589832320"
28+
TEST_SUB_PROCESS_DEFINITION_NAME = "sub-test-process-definition"
29+
TEST_SUB_PROCESS_DEFINITION_CODE = "3643589832320"
30+
TEST_PROCESS_DEFINITION_NAME = "simple-test-process-definition"
2931

3032

3133
@pytest.mark.parametrize(
@@ -50,7 +52,11 @@ def test_sub_process_task_params_attr_setter(name, value):
5052
@patch(
5153
"pydolphinscheduler.tasks.sub_process.SubProcess.get_process_definition_info",
5254
return_value=(
53-
{"id": 1, "name": PROCESS_DEFINITION_NAME, "code": PROCESS_DEFINITION_CODE}
55+
{
56+
"id": 1,
57+
"name": TEST_SUB_PROCESS_DEFINITION_NAME,
58+
"code": TEST_SUB_PROCESS_DEFINITION_CODE,
59+
}
5460
),
5561
)
5662
def test_sub_process_to_dict(mock_process_definition):
@@ -68,7 +74,7 @@ def test_sub_process_to_dict(mock_process_definition):
6874
"taskParams": {
6975
"resourceList": [],
7076
"localParams": [],
71-
"processDefinitionCode": PROCESS_DEFINITION_CODE,
77+
"processDefinitionCode": TEST_SUB_PROCESS_DEFINITION_CODE,
7278
"dependence": {},
7379
"conditionResult": {"successNode": [""], "failedNode": [""]},
7480
"waitStartTimeout": {},
@@ -86,5 +92,6 @@ def test_sub_process_to_dict(mock_process_definition):
8692
"pydolphinscheduler.core.task.Task.gen_code_and_version",
8793
return_value=(code, version),
8894
):
89-
sub_process = SubProcess(name, PROCESS_DEFINITION_NAME)
90-
assert sub_process.to_dict() == expect
95+
with ProcessDefinition(TEST_PROCESS_DEFINITION_NAME):
96+
sub_process = SubProcess(name, TEST_SUB_PROCESS_DEFINITION_NAME)
97+
assert sub_process.to_dict() == expect

0 commit comments

Comments
 (0)