Skip to content

Commit 58cc0fe

Browse files
committed
[python] Refactor get object define communicate to gateway
* Change class Base `to_dict` to `get_define` for more clearer * Remove class TaskParams and sub class make code easy to understand and avoid task implement cycle dependence * Remove class ObjectJsonBase in Task to reduce complexity fix: #7271
1 parent deefc8e commit 58cc0fe

15 files changed

Lines changed: 405 additions & 350 deletions

File tree

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/base.py

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@
2626
class Base:
2727
"""DolphinScheduler Base object."""
2828

29+
# Object key attribute, to test whether object equals and so on.
2930
_KEY_ATTR: set = {"name", "description"}
3031

31-
_TO_DICT_ATTR: set = set()
32+
# Object defines attribute, use when needs to communicate with Java gateway server.
33+
_DEFINE_ATTR: set = set()
3234

33-
DEFAULT_ATTR: Dict = {}
35+
# Object default attribute, will add those attribute to `_DEFINE_ATTR` when init assign missing.
36+
_DEFAULT_ATTR: Dict = {}
3437

3538
def __init__(self, name: str, description: Optional[str] = None):
3639
self.name = name
@@ -44,28 +47,28 @@ def __eq__(self, other):
4447
getattr(self, a, None) == getattr(other, a, None) for a in self._KEY_ATTR
4548
)
4649

47-
# TODO check how Redash do
48-
# TODO DRY
49-
def to_dict(self, camel_attr=True) -> Dict:
50-
"""Get object key attribute dict.
51-
52-
use attribute `self._TO_DICT_ATTR` to determine which attributes should including to
53-
children `to_dict` function.
54-
"""
55-
# content = {}
56-
# for attr, value in self.__dict__.items():
57-
# # Don't publish private variables
58-
# if attr.startswith("_"):
59-
# continue
60-
# else:
61-
# content[snake2camel(attr)] = value
62-
# content.update(self.DEFAULT_ATTR)
63-
# return content
50+
def get_define_custom(
51+
self, camel_attr: bool = True, custom_attr: set = None
52+
) -> Dict:
53+
"""Get object definition attribute by given attr set."""
6454
content = {}
65-
for attr in self._TO_DICT_ATTR:
55+
for attr in custom_attr:
6656
val = getattr(self, attr, None)
6757
if camel_attr:
6858
content[attr2camel(attr)] = val
6959
else:
7060
content[attr] = val
7161
return content
62+
63+
def get_define(self, camel_attr: bool = True) -> Dict:
64+
"""Get object definition attribute communicate to Java gateway server.
65+
66+
use attribute `self._DEFINE_ATTR` to determine which attributes should including when
67+
object tries to communicate with Java gateway server.
68+
"""
69+
content = self.get_define_custom(camel_attr, self._DEFINE_ATTR)
70+
update_default = {
71+
k: self._DEFAULT_ATTR.get(k) for k in self._DEFAULT_ATTR if k not in content
72+
}
73+
content.update(update_default)
74+
return content

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class ProcessDefinition(Base):
6868
"param",
6969
}
7070

71-
_TO_DICT_ATTR = {
71+
_DEFINE_ATTR = {
7272
"name",
7373
"description",
7474
"_project",
@@ -195,7 +195,7 @@ def task_definition_json(self) -> List[Dict]:
195195
if not self.tasks:
196196
return [self.tasks]
197197
else:
198-
return [task.to_dict() for task in self.tasks.values()]
198+
return [task.get_define() for task in self.tasks.values()]
199199

200200
@property
201201
def task_relation_json(self) -> List[Dict]:
@@ -204,7 +204,7 @@ def task_relation_json(self) -> List[Dict]:
204204
return [self.tasks]
205205
else:
206206
self._handle_root_relation()
207-
return [tr.to_dict() for tr in self._task_relations]
207+
return [tr.get_define() for tr in self._task_relations]
208208

209209
@property
210210
def schedule_json(self) -> Optional[Dict]:

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

Lines changed: 58 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
"""DolphinScheduler ObjectJsonBase, TaskParams and Task object."""
18+
"""DolphinScheduler Task and TaskRelation object."""
1919

2020
import logging
2121
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
@@ -32,61 +32,17 @@
3232
ProcessDefinitionContext,
3333
)
3434
from pydolphinscheduler.java_gateway import launch_gateway
35-
from pydolphinscheduler.utils.string import class_name2camel, snake2camel
3635

3736

38-
class ObjectJsonBase:
39-
"""Task base class, define `__str__` and `to_dict` function would be use in other task related class."""
40-
41-
DEFAULT_ATTR = {}
42-
43-
def __int__(self, *args, **kwargs):
44-
pass
45-
46-
def __str__(self) -> str:
47-
content = []
48-
for attribute, value in self.__dict__.items():
49-
content.append(f'"{snake2camel(attribute)}": {value}')
50-
content = ",".join(content)
51-
return f'"{class_name2camel(type(self).__name__)}":{{{content}}}'
52-
53-
# TODO check how Redash do
54-
# TODO DRY
55-
def to_dict(self) -> Dict:
56-
"""Get object key attribute dict which determine by attribute `DEFAULT_ATTR`."""
57-
content = {snake2camel(attr): value for attr, value in self.__dict__.items()}
58-
content.update(self.DEFAULT_ATTR)
59-
return content
60-
61-
62-
class TaskParams(ObjectJsonBase):
63-
"""TaskParams object, describe the key parameter of a single task."""
64-
65-
DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
66-
67-
def __init__(
68-
self,
69-
local_params: Optional[List] = None,
70-
resource_list: Optional[List] = None,
71-
dependence: Optional[Dict] = None,
72-
wait_start_timeout: Optional[Dict] = None,
73-
condition_result: Optional[Dict] = None,
74-
*args,
75-
**kwargs,
76-
):
77-
super().__init__(*args, **kwargs)
78-
self.local_params = local_params or []
79-
self.resource_list = resource_list or []
80-
self.dependence = dependence or {}
81-
self.wait_start_timeout = wait_start_timeout or {}
82-
# TODO need better way to handle it, this code just for POC
83-
self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
84-
85-
86-
class TaskRelation(ObjectJsonBase):
37+
class TaskRelation(Base):
8738
"""TaskRelation object, describe the relation of exactly two tasks."""
8839

89-
DEFAULT_ATTR = {
40+
_DEFINE_ATTR = {
41+
"pre_task_code",
42+
"post_task_code",
43+
}
44+
45+
_DEFAULT_ATTR = {
9046
"name": "",
9147
"preTaskVersion": 1,
9248
"postTaskVersion": 1,
@@ -98,8 +54,9 @@ def __init__(
9854
self,
9955
pre_task_code: int,
10056
post_task_code: int,
57+
name: Optional[str] = None,
10158
):
102-
super().__init__()
59+
super().__init__(name)
10360
self.pre_task_code = pre_task_code
10461
self.post_task_code = post_task_code
10562

@@ -110,19 +67,32 @@ def __hash__(self):
11067
class Task(Base):
11168
"""Task object, parent class for all exactly task type."""
11269

113-
DEFAULT_DEPS_ATTR = {
114-
"name": "",
115-
"preTaskVersion": 1,
116-
"postTaskVersion": 1,
117-
"conditionType": 0,
118-
"conditionParams": {},
70+
_DEFINE_ATTR = {
71+
"name",
72+
"code",
73+
"version",
74+
"task_type",
75+
"task_params",
76+
"description",
77+
"flag",
78+
"task_priority",
79+
"worker_group",
80+
"delay_time",
81+
"fail_retry_times",
82+
"fail_retry_interval",
83+
"timeout_flag",
84+
"timeout_notify_strategy",
85+
"timeout",
11986
}
12087

88+
_task_custom_attr: set = set()
89+
90+
DEFAULT_CONDITION_RESULT = {"successNode": [""], "failedNode": [""]}
91+
12192
def __init__(
12293
self,
12394
name: str,
12495
task_type: str,
125-
task_params: TaskParams,
12696
description: Optional[str] = None,
12797
flag: Optional[str] = TaskFlag.YES,
12898
task_priority: Optional[str] = TaskPriority.MEDIUM,
@@ -134,11 +104,15 @@ def __init__(
134104
timeout_notify_strategy: Optional = None,
135105
timeout: Optional[int] = 0,
136106
process_definition: Optional[ProcessDefinition] = None,
107+
local_params: Optional[List] = None,
108+
resource_list: Optional[List] = None,
109+
dependence: Optional[Dict] = None,
110+
wait_start_timeout: Optional[Dict] = None,
111+
condition_result: Optional[Dict] = None,
137112
):
138113

139114
super().__init__(name, description)
140115
self.task_type = task_type
141-
self.task_params = task_params
142116
self.flag = flag
143117
self.task_priority = task_priority
144118
self.worker_group = worker_group
@@ -169,6 +143,13 @@ def __init__(
169143
self.code,
170144
)
171145

146+
# Attribute for task param
147+
self.local_params = local_params or []
148+
self.resource_list = resource_list or []
149+
self.dependence = dependence or {}
150+
self.wait_start_timeout = wait_start_timeout or {}
151+
self.condition_result = condition_result or self.DEFAULT_CONDITION_RESULT
152+
172153
@property
173154
def process_definition(self) -> Optional[ProcessDefinition]:
174155
"""Get attribute process_definition."""
@@ -179,6 +160,22 @@ def process_definition(self, process_definition: Optional[ProcessDefinition]):
179160
"""Set attribute process_definition."""
180161
self._process_definition = process_definition
181162

163+
@property
164+
def task_params(self) -> Optional[Dict]:
165+
"""Get task parameter object.
166+
167+
Will get result to combine _task_custom_attr and custom_attr.
168+
"""
169+
custom_attr = {
170+
"local_params",
171+
"resource_list",
172+
"dependence",
173+
"wait_start_timeout",
174+
"condition_result",
175+
}
176+
custom_attr |= self._task_custom_attr
177+
return self.get_define_custom(custom_attr=custom_attr)
178+
182179
def __hash__(self):
183180
return hash(self.code)
184181

@@ -259,16 +256,3 @@ def gen_code_and_version(self) -> Tuple:
259256
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
260257
# gateway_result_checker(result)
261258
return result.get("code"), result.get("version")
262-
263-
def to_dict(self, camel_attr=True) -> Dict:
264-
"""Task `to_dict` function which will return key attribute for Task object."""
265-
content = {}
266-
for attr, value in self.__dict__.items():
267-
# Don't publish private variables
268-
if attr.startswith("_"):
269-
continue
270-
elif isinstance(value, TaskParams):
271-
content[snake2camel(attr)] = value.to_dict()
272-
else:
273-
content[snake2camel(attr)] = value
274-
return content

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/http.py

Lines changed: 15 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from typing import Optional
2121

2222
from pydolphinscheduler.constants import TaskType
23-
from pydolphinscheduler.core.task import Task, TaskParams
23+
from pydolphinscheduler.core.task import Task
2424
from pydolphinscheduler.exceptions import PyDSParamException
2525

2626

@@ -50,11 +50,22 @@ class HttpCheckCondition:
5050
BODY_NOT_CONTAINS = "BODY_NOT_CONTAINS"
5151

5252

53-
class HttpTaskParams(TaskParams):
54-
"""Parameter only for Http task types."""
53+
class Http(Task):
54+
"""Task HTTP object, declare behavior for HTTP task to dolphinscheduler."""
55+
56+
_task_custom_attr = {
57+
"url",
58+
"http_method",
59+
"http_params",
60+
"http_check_condition",
61+
"condition",
62+
"connect_timeout",
63+
"socket_timeout",
64+
}
5565

5666
def __init__(
5767
self,
68+
name: str,
5869
url: str,
5970
http_method: Optional[str] = HttpMethod.GET,
6071
http_params: Optional[str] = None,
@@ -65,7 +76,7 @@ def __init__(
6576
*args,
6677
**kwargs
6778
):
68-
super().__init__(*args, **kwargs)
79+
super().__init__(name, TaskType.HTTP, *args, **kwargs)
6980
self.url = url
7081
if not hasattr(HttpMethod, http_method):
7182
raise PyDSParamException(
@@ -88,31 +99,3 @@ def __init__(
8899
self.condition = condition
89100
self.connect_timeout = connect_timeout
90101
self.socket_timeout = socket_timeout
91-
92-
93-
class Http(Task):
94-
"""Task HTTP object, declare behavior for HTTP task to dolphinscheduler."""
95-
96-
def __init__(
97-
self,
98-
name: str,
99-
url: str,
100-
http_method: Optional[str] = HttpMethod.GET,
101-
http_params: Optional[str] = None,
102-
http_check_condition: Optional[str] = HttpCheckCondition.STATUS_CODE_DEFAULT,
103-
condition: Optional[str] = None,
104-
connect_timeout: Optional[int] = 60000,
105-
socket_timeout: Optional[int] = 60000,
106-
*args,
107-
**kwargs
108-
):
109-
task_params = HttpTaskParams(
110-
url=url,
111-
http_method=http_method,
112-
http_params=http_params,
113-
http_check_condition=http_check_condition,
114-
condition=condition,
115-
connect_timeout=connect_timeout,
116-
socket_timeout=socket_timeout,
117-
)
118-
super().__init__(name, TaskType.HTTP, task_params, *args, **kwargs)

0 commit comments

Comments
 (0)