Skip to content

Commit e76cf77

Browse files
authored
[python] Add parameter schedule for process definition (#6664)
* [python] Add parameter schedule for process definition * Rebase and fix some code style * May schedule work on both string and datetime * Fix flaky test * Add comment about freeze time * Add edge test for schedule_json with None schedule * Fix test function name * Fix rebase error
1 parent 089f73e commit e76cf77

9 files changed

Lines changed: 485 additions & 42 deletions

File tree

dolphinscheduler-python/pydolphinscheduler/examples/tutorial.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@
3434
from pydolphinscheduler.tasks.shell import Shell
3535

3636

37-
with ProcessDefinition(name="tutorial", tenant="tenant_exists") as pd:
37+
with ProcessDefinition(
38+
name="tutorial",
39+
schedule="0 0 0 * * ? *",
40+
start_time="2021-01-01",
41+
tenant="tenant_exists",
42+
) as pd:
3843
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
3944
task_child_one = Shell(name="task_child_one", command="echo 'child one'")
4045
task_child_two = Shell(name="task_child_two", command="echo 'child two'")

dolphinscheduler-python/pydolphinscheduler/requirements_dev.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
# testting
1919
pytest~=6.2.5
20+
freezegun
2021
# code linting and formatting
2122
flake8
2223
flake8-docstrings

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/constants.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class ProcessDefinitionDefault:
3838
USER_STATE: int = 1
3939
QUEUE: str = "queuePythonGateway"
4040
WORKER_GROUP: str = "default"
41+
TIME_ZONE: str = "Asia/Shanghai"
4142

4243

4344
class TaskPriority(str):
@@ -85,3 +86,26 @@ class JavaGatewayDefault(str):
8586
RESULT_STATUS_SUCCESS = "SUCCESS"
8687

8788
RESULT_DATA = "data"
89+
90+
91+
class Delimiter(str):
92+
"""Constants for delimiter."""
93+
94+
BAR = "-"
95+
DASH = "/"
96+
COLON = ":"
97+
98+
99+
class Time(str):
100+
"""Constants for date."""
101+
102+
FMT_STD_DATE = "%Y-%m-%d"
103+
LEN_STD_DATE = 10
104+
105+
FMT_DASH_DATE = "%Y/%m/%d"
106+
107+
FMT_SHORT_DATE = "%Y%m%d"
108+
LEN_SHORT_DATE = 8
109+
110+
FMT_STD_TIME = "%H:%M:%S"
111+
FMT_NO_COLON_TIME = "%H%M%S"

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/process_definition.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
"""Module process definition, core class for workflow define."""
1919

2020
import json
21-
from typing import Optional, List, Dict, Set
21+
from datetime import datetime
22+
from typing import Optional, List, Dict, Set, Any
2223

2324
from pydolphinscheduler.constants import (
2425
ProcessDefinitionReleaseState,
@@ -27,6 +28,7 @@
2728
from pydolphinscheduler.core.base import Base
2829
from pydolphinscheduler.java_gateway import launch_gateway
2930
from pydolphinscheduler.side import Tenant, Project, User
31+
from pydolphinscheduler.utils.date import conv_from_str, conv_to_schedule, MAX_DATETIME
3032

3133

3234
class ProcessDefinitionContext:
@@ -83,6 +85,10 @@ def __init__(
8385
self,
8486
name: str,
8587
description: Optional[str] = None,
88+
schedule: Optional[str] = None,
89+
start_time: Optional[str] = None,
90+
end_time: Optional[str] = None,
91+
timezone: Optional[str] = ProcessDefinitionDefault.TIME_ZONE,
8692
user: Optional[str] = ProcessDefinitionDefault.USER,
8793
project: Optional[str] = ProcessDefinitionDefault.PROJECT,
8894
tenant: Optional[str] = ProcessDefinitionDefault.TENANT,
@@ -93,6 +99,10 @@ def __init__(
9399
param: Optional[List] = None,
94100
):
95101
super().__init__(name, description)
102+
self.schedule = schedule
103+
self._start_time = start_time
104+
self._end_time = end_time
105+
self.timezone = timezone
96106
self._user = user
97107
self._project = project
98108
self._tenant = tenant
@@ -149,6 +159,35 @@ def user(self) -> User:
149159
ProcessDefinitionDefault.USER_STATE,
150160
)
151161

162+
@staticmethod
163+
def _parse_datetime(val: Any) -> Any:
164+
if val is None or isinstance(val, datetime):
165+
return val
166+
elif isinstance(val, str):
167+
return conv_from_str(val)
168+
else:
169+
raise ValueError("Do not support value type %s for now", type(val))
170+
171+
@property
172+
def start_time(self) -> Any:
173+
"""Get attribute start_time."""
174+
return self._parse_datetime(self._start_time)
175+
176+
@start_time.setter
177+
def start_time(self, val) -> None:
178+
"""Set attribute start_time."""
179+
self._start_time = val
180+
181+
@property
182+
def end_time(self) -> Any:
183+
"""Get attribute end_time."""
184+
return self._parse_datetime(self._end_time)
185+
186+
@end_time.setter
187+
def end_time(self, val) -> None:
188+
"""Set attribute end_time."""
189+
self._end_time = val
190+
152191
@property
153192
def task_definition_json(self) -> List[Dict]:
154193
"""Return all tasks definition in list of dict."""
@@ -166,6 +205,25 @@ def task_relation_json(self) -> List[Dict]:
166205
self._handle_root_relation()
167206
return [tr.to_dict() for tr in self._task_relations]
168207

208+
@property
209+
def schedule_json(self) -> Optional[Dict]:
210+
"""Get schedule parameter json object. This is requests from java gateway interface."""
211+
if not self.schedule:
212+
return None
213+
else:
214+
start_time = conv_to_schedule(
215+
self.start_time if self.start_time else datetime.now()
216+
)
217+
end_time = conv_to_schedule(
218+
self.end_time if self.end_time else MAX_DATETIME
219+
)
220+
return {
221+
"startTime": start_time,
222+
"endTime": end_time,
223+
"crontab": self.schedule,
224+
"timezoneId": self.timezone,
225+
}
226+
169227
# TODO inti DAG's tasks are in the same location with default {x: 0, y: 0}
170228
@property
171229
def task_location(self) -> List[Dict]:
@@ -274,6 +332,7 @@ def submit(self) -> int:
274332
self.name,
275333
str(self.description) if self.description else "",
276334
str(self.param) if self.param else None,
335+
json.dumps(self.schedule_json) if self.schedule_json else None,
277336
json.dumps(self.task_location),
278337
self.timeout,
279338
self.worker_group,
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Date util function collections."""
19+
20+
from datetime import datetime
21+
from pydolphinscheduler.constants import Delimiter, Time
22+
23+
LEN_SUPPORT_DATETIME = (
24+
15,
25+
19,
26+
)
27+
28+
FMT_SHORT = f"{Time.FMT_SHORT_DATE} {Time.FMT_NO_COLON_TIME}"
29+
FMT_DASH = f"{Time.FMT_DASH_DATE} {Time.FMT_STD_TIME}"
30+
FMT_STD = f"{Time.FMT_STD_DATE} {Time.FMT_STD_TIME}"
31+
32+
MAX_DATETIME = datetime(9999, 12, 31, 23, 59, 59)
33+
34+
35+
def conv_to_schedule(src: datetime) -> str:
36+
"""Convert given datetime to schedule date string."""
37+
return datetime.strftime(src, FMT_STD)
38+
39+
40+
def conv_from_str(src: str) -> datetime:
41+
"""Convert given string to datetime.
42+
43+
This function give an ability to convert string to datetime, and for now it could handle
44+
format like:
45+
- %Y-%m-%d
46+
- %Y/%m/%d
47+
- %Y%m%d
48+
- %Y-%m-%d %H:%M:%S
49+
- %Y/%m/%d %H:%M:%S
50+
- %Y%m%d %H%M%S
51+
If pattern not like above be given will raise NotImplementedError.
52+
"""
53+
len_ = len(src)
54+
if len_ == Time.LEN_SHORT_DATE:
55+
return datetime.strptime(src, Time.FMT_SHORT_DATE)
56+
elif len_ == Time.LEN_STD_DATE:
57+
if Delimiter.BAR in src:
58+
return datetime.strptime(src, Time.FMT_STD_DATE)
59+
elif Delimiter.DASH in src:
60+
return datetime.strptime(src, Time.FMT_DASH_DATE)
61+
else:
62+
raise NotImplementedError(
63+
"%s could not be convert to datetime for now.", src
64+
)
65+
elif len_ in LEN_SUPPORT_DATETIME:
66+
if Delimiter.BAR in src and Delimiter.COLON in src:
67+
return datetime.strptime(src, FMT_STD)
68+
elif Delimiter.DASH in src and Delimiter.COLON in src:
69+
return datetime.strptime(src, FMT_DASH)
70+
elif (
71+
Delimiter.DASH not in src
72+
and Delimiter.BAR not in src
73+
and Delimiter.COLON not in src
74+
):
75+
return datetime.strptime(src, FMT_SHORT)
76+
else:
77+
raise NotImplementedError(
78+
"%s could not be convert to datetime for now.", src
79+
)
80+
else:
81+
raise NotImplementedError("%s could not be convert to datetime for now.", src)

0 commit comments

Comments
 (0)