Skip to content

Commit 37bd41a

Browse files
committed
Fix workflow error when adding or update task
1 parent 5763068 commit 37bd41a

2 files changed

Lines changed: 38 additions & 13 deletions

File tree

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/core/task.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
from typing import Optional, List, Dict, Set, Union, Sequence
18+
from typing import Optional, List, Dict, Set, Union, Sequence, Tuple
1919

2020
from pydolphinscheduler.constants import TaskPriority, ProcessDefinitionDefault, TaskFlag, TaskTimeoutFlag, \
2121
DefaultTaskCodeNum, JavaGatewayDefault
@@ -99,9 +99,6 @@ def __hash__(self):
9999

100100

101101
class Task(Base):
102-
DEFAULT_INS_ATTR = {
103-
"version": 1,
104-
}
105102

106103
DEFAULT_DEPS_ATTR = {
107104
"name": "",
@@ -130,7 +127,6 @@ def __init__(
130127
):
131128

132129
super().__init__(name, description)
133-
self.code: int = self.gen_code()
134130
self.task_type = task_type
135131
self.task_params = task_params
136132
self.flag = flag
@@ -147,6 +143,11 @@ def __init__(
147143
self._upstream_task_codes: Set[int] = set()
148144
self._downstream_task_codes: Set[int] = set()
149145
self._task_relation: Set[TaskRelation] = set()
146+
# move attribute code and version after _process_definition and process_definition declare
147+
self.code, self.version = self.gen_code_and_version()
148+
# Add task to process definition, maybe we could put into property process_definition latter
149+
if self.process_definition is not None and self.code not in self.process_definition.tasks:
150+
self.process_definition.add_task(self)
150151

151152
@property
152153
def process_definition(self) -> Optional[ProcessDefinition]:
@@ -157,8 +158,6 @@ def process_definition(self) -> Optional[ProcessDefinition]:
157158

158159
@process_definition.setter
159160
def process_definition(self, process_definition: Optional[ProcessDefinition]):
160-
if process_definition is not None and self.code not in process_definition.tasks:
161-
process_definition.add_task(self)
162161
self._process_definition = process_definition
163162

164163
def __hash__(self):
@@ -217,12 +216,13 @@ def set_downstream(self, tasks: Union["Task", Sequence["Task"]]) -> None:
217216
self._set_deps(tasks, upstream=False)
218217

219218
# TODO code should better generate in bulk mode when :ref: processDefinition run submit or start
220-
@staticmethod
221-
def gen_code() -> int:
219+
def gen_code_and_version(self) -> Tuple:
220+
# TODO get code from specific project process definition and task name
222221
gateway = launch_gateway()
223-
result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
224-
gateway_result_checker(result)
225-
return result[JavaGatewayDefault.RESULT_DATA][0]
222+
result = gateway.entry_point.getCodeAndVersion(self.process_definition._project, self.name)
223+
# result = gateway.entry_point.genTaskCodeList(DefaultTaskCodeNum.DEFAULT)
224+
# gateway_result_checker(result)
225+
return result.get("code"), result.get("version")
226226

227227
def to_dict(self, camel_attr=True) -> Dict:
228228
content = {}
@@ -234,5 +234,4 @@ def to_dict(self, camel_attr=True) -> Dict:
234234
content[snake2camel(attr)] = value.to_dict()
235235
else:
236236
content[snake2camel(attr)] = value
237-
content.update(self.DEFAULT_INS_ATTR)
238237
return content

dolphinscheduler-python/src/main/java/org/apache/dolphinscheduler/server/PythonGatewayServer.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,16 @@
3434
import org.apache.dolphinscheduler.common.enums.TaskDependType;
3535
import org.apache.dolphinscheduler.common.enums.UserType;
3636
import org.apache.dolphinscheduler.common.enums.WarningType;
37+
import org.apache.dolphinscheduler.common.utils.SnowFlakeUtils;
3738
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
3839
import org.apache.dolphinscheduler.dao.entity.Project;
3940
import org.apache.dolphinscheduler.dao.entity.Queue;
41+
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
4042
import org.apache.dolphinscheduler.dao.entity.Tenant;
4143
import org.apache.dolphinscheduler.dao.entity.User;
4244
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
4345
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
46+
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
4447
import org.springframework.beans.factory.annotation.Autowired;
4548
import org.springframework.boot.SpringApplication;
4649
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
@@ -91,6 +94,9 @@ public class PythonGatewayServer extends SpringBootServletInitializer {
9194
@Autowired
9295
private ProjectMapper projectMapper;
9396

97+
@Autowired
98+
private TaskDefinitionMapper taskDefinitionMapper;
99+
94100
// TODO replace this user to build in admin user if we make sure build in one could not be change
95101
private final User dummyAdminUser = new User() {
96102
{
@@ -117,6 +123,26 @@ public Map<String, Object> genTaskCodeList(Integer genNum) {
117123
return taskDefinitionService.genTaskCodeList(dummyAdminUser, genNum);
118124
}
119125

126+
public Map<String, Long> getCodeAndVersion(String projectName, String taskName) throws SnowFlakeUtils.SnowFlakeException {
127+
Project project = projectMapper.queryByName(projectName);
128+
Map<String, Long> result = new HashMap<>();
129+
// project do not exists, mean task not exists too, so we should directly return init value
130+
if (project == null) {
131+
result.put("code", SnowFlakeUtils.getInstance().nextId());
132+
result.put("version", 0L);
133+
return result;
134+
}
135+
TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), taskName);
136+
if (taskDefinition == null) {
137+
result.put("code", SnowFlakeUtils.getInstance().nextId());
138+
result.put("version", 0L);
139+
} else {
140+
result.put("code", taskDefinition.getCode());
141+
result.put("version", (long) taskDefinition.getVersion());
142+
}
143+
return result;
144+
}
145+
120146
/**
121147
* create or update process definition.
122148
* If process definition do not exists in Project=`projectCode` would create a new one

0 commit comments

Comments
 (0)