Skip to content

Commit 2b20b8e

Browse files
committed
add workflow as code task type mr
1 parent 8ef72fc commit 2b20b8e

4 files changed

Lines changed: 160 additions & 0 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""A example workflow for task mr."""
19+
20+
from pydolphinscheduler.core.engine import ProgramType
21+
from pydolphinscheduler.core.process_definition import ProcessDefinition
22+
from pydolphinscheduler.tasks.map_reduce import MR
23+
24+
with ProcessDefinition(name="task_map_reduce_example", tenant="tenant_exists") as pd:
25+
task = MR(
26+
name="task_mr",
27+
main_class="wordcount",
28+
main_package="hadoop-mapreduce-examples-3.3.1.jar",
29+
program_type=ProgramType.JAVA,
30+
main_args="/dolphinscheduler/tenant_exists/resources/file.txt /output/ds",
31+
)
32+
pd.run()

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class TaskType(str):
7979
SWITCH = "SWITCH"
8080
FLINK = "FLINK"
8181
SPARK = "SPARK"
82+
MR = "MR"
8283

8384

8485
class DefaultTaskCodeNum(str):
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Task MR."""
19+
20+
from typing import Optional
21+
22+
from pydolphinscheduler.constants import TaskType
23+
from pydolphinscheduler.core.engine import Engine, ProgramType
24+
25+
26+
class MR(Engine):
27+
"""Task mr object, declare behavior for mr task to dolphinscheduler."""
28+
29+
_task_custom_attr = {
30+
"app_name",
31+
"main_args",
32+
"others",
33+
}
34+
35+
def __init__(
36+
self,
37+
name: str,
38+
main_class: str,
39+
main_package: str,
40+
program_type: Optional[ProgramType] = ProgramType.SCALA,
41+
app_name: Optional[str] = None,
42+
main_args: Optional[str] = None,
43+
others: Optional[str] = None,
44+
*args,
45+
**kwargs
46+
):
47+
super().__init__(
48+
name, TaskType.MR, main_class, main_package, program_type, *args, **kwargs
49+
)
50+
self.app_name = app_name
51+
self.main_args = main_args
52+
self.others = others
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Test Task MR."""
19+
20+
from unittest.mock import patch
21+
22+
from pydolphinscheduler.tasks.map_reduce import MR, ProgramType
23+
24+
25+
@patch(
26+
"pydolphinscheduler.core.engine.Engine.get_resource_info",
27+
return_value=({"id": 1, "name": "test"}),
28+
)
29+
def test_mr_get_define(mock_resource):
30+
"""Test task mr function get_define."""
31+
code = 123
32+
version = 1
33+
name = "test_mr_get_define"
34+
main_class = "org.apache.mr.test_main_class"
35+
main_package = "test_main_package"
36+
program_type = ProgramType.JAVA
37+
main_args = "/dolphinscheduler/resources/file.txt /output/ds"
38+
39+
expect = {
40+
"code": code,
41+
"name": name,
42+
"version": 1,
43+
"description": None,
44+
"delayTime": 0,
45+
"taskType": "MR",
46+
"taskParams": {
47+
"mainClass": main_class,
48+
"mainJar": {
49+
"id": 1,
50+
},
51+
"programType": program_type,
52+
"appName": None,
53+
"mainArgs": main_args,
54+
"others": None,
55+
"localParams": [],
56+
"resourceList": [],
57+
"dependence": {},
58+
"conditionResult": {"successNode": [""], "failedNode": [""]},
59+
"waitStartTimeout": {},
60+
},
61+
"flag": "YES",
62+
"taskPriority": "MEDIUM",
63+
"workerGroup": "default",
64+
"failRetryTimes": 0,
65+
"failRetryInterval": 1,
66+
"timeoutFlag": "CLOSE",
67+
"timeoutNotifyStrategy": None,
68+
"timeout": 0,
69+
}
70+
with patch(
71+
"pydolphinscheduler.core.task.Task.gen_code_and_version",
72+
return_value=(code, version),
73+
):
74+
task = MR(name, main_class, main_package, program_type, main_args=main_args)
75+
assert task.get_define() == expect

0 commit comments

Comments
 (0)