Skip to content

Commit 1948030

Browse files
authored
[python] Add task base database and procedure (#7279)
We add a new task procedure, and add parent class database for both sql task and procedure task fix: #6929
1 parent 12b46df commit 1948030

7 files changed

Lines changed: 381 additions & 41 deletions

File tree

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ class TaskType(str):
7272
PYTHON = "PYTHON"
7373
SQL = "SQL"
7474
SUB_PROCESS = "SUB_PROCESS"
75+
PROCEDURE = "PROCEDURE"
7576

7677

7778
class DefaultTaskCodeNum(str):
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Task database base task."""
19+
20+
from typing import Dict
21+
22+
from pydolphinscheduler.core.task import Task
23+
from pydolphinscheduler.java_gateway import launch_gateway
24+
25+
26+
class Database(Task):
27+
"""Base task to handle database, declare behavior for the base handler of database.
28+
29+
It a parent class for all database task of dolphinscheduler. And it should run sql like
30+
job in multiply sql lik engine, such as:
31+
- ClickHouse
32+
- DB2
33+
- HIVE
34+
- MySQL
35+
- Oracle
36+
- Postgresql
37+
- Presto
38+
- SQLServer
39+
You provider datasource_name contain connection information, it decisions which
40+
database type and database instance would run this sql.
41+
"""
42+
43+
_task_custom_attr = {"sql"}
44+
45+
def __init__(
46+
self, task_type: str, name: str, datasource_name: str, sql: str, *args, **kwargs
47+
):
48+
super().__init__(name, task_type, *args, **kwargs)
49+
self.datasource_name = datasource_name
50+
self.sql = sql
51+
self._datasource = {}
52+
53+
def get_datasource_type(self) -> str:
54+
"""Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`."""
55+
return self.get_datasource_info(self.datasource_name).get("type")
56+
57+
def get_datasource_id(self) -> str:
58+
"""Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`."""
59+
return self.get_datasource_info(self.datasource_name).get("id")
60+
61+
def get_datasource_info(self, name) -> Dict:
62+
"""Get datasource info from java gateway, contains datasource id, type, name."""
63+
if self._datasource:
64+
return self._datasource
65+
else:
66+
gateway = launch_gateway()
67+
self._datasource = gateway.entry_point.getDatasourceInfo(name)
68+
return self._datasource
69+
70+
@property
71+
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
72+
"""Override Task.task_params for sql task.
73+
74+
Sql task have some specials attribute for task_params, and is odd if we
75+
directly set as python property, so we Override Task.task_params here.
76+
"""
77+
params = super().task_params
78+
custom_params = {
79+
"type": self.get_datasource_type(),
80+
"datasource": self.get_datasource_id(),
81+
}
82+
params.update(custom_params)
83+
return params
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Task procedure."""
19+
20+
from pydolphinscheduler.constants import TaskType
21+
from pydolphinscheduler.tasks.database import Database
22+
23+
24+
class Procedure(Database):
25+
"""Task Procedure object, declare behavior for Procedure task to dolphinscheduler.
26+
27+
It should run database procedure job in multiply sql lik engine, such as:
28+
- ClickHouse
29+
- DB2
30+
- HIVE
31+
- MySQL
32+
- Oracle
33+
- Postgresql
34+
- Presto
35+
- SQLServer
36+
You provider datasource_name contain connection information, it decisions which
37+
database type and database instance would run this sql.
38+
"""
39+
40+
def __init__(self, name: str, datasource_name: str, sql: str, *args, **kwargs):
41+
super().__init__(
42+
TaskType.PROCEDURE, name, datasource_name, sql, *args, **kwargs
43+
)

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/sql.py

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818
"""Task sql."""
1919

2020
import re
21-
from typing import Dict, Optional
21+
from typing import Optional
2222

2323
from pydolphinscheduler.constants import TaskType
24-
from pydolphinscheduler.core.task import Task
25-
from pydolphinscheduler.java_gateway import launch_gateway
24+
from pydolphinscheduler.tasks.database import Database
2625

2726

2827
class SqlType:
@@ -32,7 +31,7 @@ class SqlType:
3231
NOT_SELECT = 1
3332

3433

35-
class Sql(Task):
34+
class Sql(Database):
3635
"""Task SQL object, declare behavior for SQL task to dolphinscheduler.
3736
3837
It should run sql job in multiply sql lik engine, such as:
@@ -67,30 +66,10 @@ def __init__(
6766
*args,
6867
**kwargs
6968
):
70-
super().__init__(name, TaskType.SQL, *args, **kwargs)
71-
self.datasource_name = datasource_name
72-
self.sql = sql
69+
super().__init__(TaskType.SQL, name, datasource_name, sql, *args, **kwargs)
7370
self.pre_statements = pre_statements or []
7471
self.post_statements = post_statements or []
7572
self.display_rows = display_rows
76-
self._datasource = {}
77-
78-
def get_datasource_type(self) -> str:
79-
"""Get datasource type from java gateway, a wrapper for :func:`get_datasource_info`."""
80-
return self.get_datasource_info(self.datasource_name).get("type")
81-
82-
def get_datasource_id(self) -> str:
83-
"""Get datasource id from java gateway, a wrapper for :func:`get_datasource_info`."""
84-
return self.get_datasource_info(self.datasource_name).get("id")
85-
86-
def get_datasource_info(self, name) -> Dict:
87-
"""Get datasource info from java gateway, contains datasource id, type, name."""
88-
if self._datasource:
89-
return self._datasource
90-
else:
91-
gateway = launch_gateway()
92-
self._datasource = gateway.entry_point.getDatasourceInfo(name)
93-
return self._datasource
9473

9574
@property
9675
def sql_type(self) -> int:
@@ -103,18 +82,3 @@ def sql_type(self) -> int:
10382
return SqlType.NOT_SELECT
10483
else:
10584
return SqlType.SELECT
106-
107-
@property
108-
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:
109-
"""Override Task.task_params for sql task.
110-
111-
Sql task have some specials attribute for task_params, and is odd if we
112-
directly set as python property, so we Override Task.task_params here.
113-
"""
114-
params = super().task_params
115-
custom_params = {
116-
"type": self.get_datasource_type(),
117-
"datasource": self.get_datasource_id(),
118-
}
119-
params.update(custom_params)
120-
return params
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Test Task Database."""
19+
20+
21+
from unittest.mock import patch
22+
23+
import pytest
24+
25+
from pydolphinscheduler.tasks.database import Database
26+
27+
TEST_DATABASE_TASK_TYPE = "SQL"
28+
TEST_DATABASE_SQL = "select 1"
29+
TEST_DATABASE_DATASOURCE_NAME = "test_datasource"
30+
31+
32+
@patch(
33+
"pydolphinscheduler.core.task.Task.gen_code_and_version",
34+
return_value=(123, 1),
35+
)
36+
@patch(
37+
"pydolphinscheduler.tasks.database.Database.get_datasource_info",
38+
return_value=({"id": 1, "type": "mock_type"}),
39+
)
40+
def test_get_datasource_detail(mock_datasource, mock_code_version):
41+
"""Test :func:`get_datasource_type` and :func:`get_datasource_id` can return expect value."""
42+
name = "test_get_database_detail"
43+
task = Database(
44+
TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL
45+
)
46+
assert 1 == task.get_datasource_id()
47+
assert "mock_type" == task.get_datasource_type()
48+
49+
50+
@pytest.mark.parametrize(
51+
"attr, expect",
52+
[
53+
(
54+
{
55+
"task_type": TEST_DATABASE_TASK_TYPE,
56+
"name": "test-task-params",
57+
"datasource_name": TEST_DATABASE_DATASOURCE_NAME,
58+
"sql": TEST_DATABASE_SQL,
59+
},
60+
{
61+
"type": "MYSQL",
62+
"datasource": 1,
63+
"sql": TEST_DATABASE_SQL,
64+
"localParams": [],
65+
"resourceList": [],
66+
"dependence": {},
67+
"waitStartTimeout": {},
68+
"conditionResult": {"successNode": [""], "failedNode": [""]},
69+
},
70+
)
71+
],
72+
)
73+
@patch(
74+
"pydolphinscheduler.core.task.Task.gen_code_and_version",
75+
return_value=(123, 1),
76+
)
77+
@patch(
78+
"pydolphinscheduler.tasks.database.Database.get_datasource_info",
79+
return_value=({"id": 1, "type": "MYSQL"}),
80+
)
81+
def test_property_task_params(mock_datasource, mock_code_version, attr, expect):
82+
"""Test task database task property."""
83+
task = Database(**attr)
84+
assert expect == task.task_params
85+
86+
87+
@patch(
88+
"pydolphinscheduler.core.task.Task.gen_code_and_version",
89+
return_value=(123, 1),
90+
)
91+
@patch(
92+
"pydolphinscheduler.tasks.database.Database.get_datasource_info",
93+
return_value=({"id": 1, "type": "MYSQL"}),
94+
)
95+
def test_database_get_define(mock_datasource, mock_code_version):
96+
"""Test task database function get_define."""
97+
name = "test_database_get_define"
98+
expect = {
99+
"code": 123,
100+
"name": name,
101+
"version": 1,
102+
"description": None,
103+
"delayTime": 0,
104+
"taskType": TEST_DATABASE_TASK_TYPE,
105+
"taskParams": {
106+
"type": "MYSQL",
107+
"datasource": 1,
108+
"sql": TEST_DATABASE_SQL,
109+
"localParams": [],
110+
"resourceList": [],
111+
"dependence": {},
112+
"conditionResult": {"successNode": [""], "failedNode": [""]},
113+
"waitStartTimeout": {},
114+
},
115+
"flag": "YES",
116+
"taskPriority": "MEDIUM",
117+
"workerGroup": "default",
118+
"failRetryTimes": 0,
119+
"failRetryInterval": 1,
120+
"timeoutFlag": "CLOSE",
121+
"timeoutNotifyStrategy": None,
122+
"timeout": 0,
123+
}
124+
task = Database(
125+
TEST_DATABASE_TASK_TYPE, name, TEST_DATABASE_DATASOURCE_NAME, TEST_DATABASE_SQL
126+
)
127+
assert task.get_define() == expect

0 commit comments

Comments
 (0)