Skip to content

Commit e599ffc

Browse files
authored
Merge 958b56c into 88cd37f
2 parents 88cd37f + 958b56c commit e599ffc

4 files changed

Lines changed: 136 additions & 6 deletions

File tree

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessTaskRelationController.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,18 @@ public Result moveTaskProcessRelation(@ApiIgnore @RequestAttribute(value = Const
127127
@RequestParam(name = "processDefinitionCode", required = true) long processDefinitionCode,
128128
@RequestParam(name = "targetProcessDefinitionCode", required = true) long targetProcessDefinitionCode,
129129
@RequestParam(name = "taskCode", required = true) long taskCode) {
130-
return returnDataList(processTaskRelationService.moveTaskProcessRelation(loginUser, projectCode, processDefinitionCode,
131-
targetProcessDefinitionCode, taskCode));
130+
Map<String, Object> result = new HashMap<>();
131+
if (processDefinitionCode == 0L) {
132+
putMsg(result, DATA_IS_NOT_VALID, "processDefinitionCode");
133+
} else if (targetProcessDefinitionCode == 0L) {
134+
putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
135+
} else if (taskCode == 0L) {
136+
putMsg(result, DATA_IS_NOT_VALID, "taskCode");
137+
} else {
138+
result = processTaskRelationService.moveTaskProcessRelation(loginUser, projectCode, processDefinitionCode,
139+
targetProcessDefinitionCode, taskCode);
140+
}
141+
return returnDataList(result);
132142
}
133143

134144
/**

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,9 +286,10 @@ public enum Status {
286286
QUERY_TASK_PROCESS_RELATION_ERROR(50049, "query process task relation error", "查询工作流任务关系错误"),
287287
TASK_DEFINE_STATE_ONLINE(50050, "task definition {0} is already on line", "任务定义[{0}]已上线"),
288288
TASK_HAS_DOWNSTREAM(50051, "Task [{0}] exists downstream dependence", "任务[{0}]存在下游依赖"),
289-
MAIN_TABLE_USING_VERSION(50052, "the version that the master table is using", "主表正在使用该版本"),
290-
PROJECT_PROCESS_NOT_MATCH(50053, "the project and the process is not match", "项目和工作流不匹配"),
291-
DELETE_EDGE_ERROR(50054, "delete edge error", "删除工作流任务连接线错误"),
289+
TASK_HAS_UPSTREAM(50052, "Task [{0}] exists upstream dependence", "任务[{0}]存在上游依赖"),
290+
MAIN_TABLE_USING_VERSION(50053, "the version that the master table is using", "主表正在使用该版本"),
291+
PROJECT_PROCESS_NOT_MATCH(50054, "the project and the process is not match", "项目和工作流不匹配"),
292+
DELETE_EDGE_ERROR(50055, "delete edge error", "删除工作流任务连接线错误"),
292293
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
293294

294295
/**

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
package org.apache.dolphinscheduler.api.service.impl;
1919

20+
import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID;
21+
2022
import org.apache.dolphinscheduler.api.enums.Status;
2123
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
2224
import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService;
2325
import org.apache.dolphinscheduler.api.service.ProjectService;
2426
import org.apache.dolphinscheduler.common.Constants;
2527
import org.apache.dolphinscheduler.common.enums.TaskType;
28+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2629
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
2730
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation;
2831
import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
@@ -42,6 +45,7 @@
4245

4346
import java.util.ArrayList;
4447
import java.util.Date;
48+
import java.util.HashSet;
4549
import java.util.List;
4650
import java.util.Map;
4751
import java.util.Objects;
@@ -52,6 +56,8 @@
5256
import org.springframework.stereotype.Service;
5357
import org.springframework.transaction.annotation.Transactional;
5458

59+
import com.fasterxml.jackson.databind.node.ArrayNode;
60+
import com.fasterxml.jackson.databind.node.ObjectNode;
5561
import com.google.common.collect.Lists;
5662

5763
/**
@@ -186,7 +192,85 @@ private ProcessTaskRelationLog setRelationLog(ProcessDefinition processDefinitio
186192
*/
187193
@Override
188194
public Map<String, Object> moveTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long taskCode) {
189-
return null;
195+
Project project = projectMapper.queryByCode(projectCode);
196+
//check user access for project
197+
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
198+
if (result.get(Constants.STATUS) != Status.SUCCESS) {
199+
return result;
200+
}
201+
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(targetProcessDefinitionCode);
202+
if (processDefinition == null) {
203+
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, targetProcessDefinitionCode);
204+
return result;
205+
}
206+
if (processDefinition.getProjectCode() != projectCode) {
207+
putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH);
208+
return result;
209+
}
210+
List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L);
211+
if (CollectionUtils.isNotEmpty(downstreamList)) {
212+
Set<Long> postTaskCodes = downstreamList
213+
.stream()
214+
.map(ProcessTaskRelation::getPostTaskCode)
215+
.collect(Collectors.toSet());
216+
putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ","));
217+
return result;
218+
}
219+
List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode);
220+
if (upstreamList.isEmpty()) {
221+
putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, "taskCode:" + taskCode);
222+
return result;
223+
} else {
224+
Set<Long> preTaskCodes = upstreamList
225+
.stream()
226+
.map(ProcessTaskRelation::getPreTaskCode)
227+
.collect(Collectors.toSet());
228+
if (preTaskCodes.size() > 1 || !preTaskCodes.contains(0L)) {
229+
putMsg(result, Status.TASK_HAS_UPSTREAM, org.apache.commons.lang.StringUtils.join(preTaskCodes, ","));
230+
return result;
231+
}
232+
}
233+
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
234+
if (null == taskDefinition) {
235+
putMsg(result, Status.DATA_IS_NULL, "taskDefinition");
236+
return result;
237+
}
238+
ObjectNode paramNode = JSONUtils.parseObject(taskDefinition.getTaskParams());
239+
if (TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())) {
240+
Set<Long> depProcessDefinitionCodes = new HashSet<>();
241+
ObjectNode dependence = (ObjectNode) paramNode.get("dependence");
242+
ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList")));
243+
for (int i = 0; i < dependTaskList.size(); i++) {
244+
ObjectNode dependTask = (ObjectNode) dependTaskList.path(i);
245+
ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList")));
246+
for (int j = 0; j < dependItemList.size(); j++) {
247+
ObjectNode dependItem = (ObjectNode) dependItemList.path(j);
248+
long definitionCode = dependItem.get("definitionCode").asLong();
249+
depProcessDefinitionCodes.add(definitionCode);
250+
}
251+
}
252+
if (depProcessDefinitionCodes.contains(targetProcessDefinitionCode)) {
253+
putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
254+
return result;
255+
}
256+
}
257+
if (TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) {
258+
long subProcessDefinitionCode = paramNode.get("processDefinitionCode").asLong();
259+
if (targetProcessDefinitionCode == subProcessDefinitionCode) {
260+
putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode");
261+
return result;
262+
}
263+
}
264+
Date now = new Date();
265+
ProcessTaskRelation processTaskRelation = upstreamList.get(0);
266+
processTaskRelation.setProcessDefinitionCode(processDefinition.getCode());
267+
processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion());
268+
processTaskRelation.setUpdateTime(now);
269+
int update = processTaskRelationMapper.updateById(processTaskRelation);
270+
if (update == 0) {
271+
putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR);
272+
}
273+
return result;
190274
}
191275

192276
/**

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessTaskRelationServiceTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ private TaskDefinition getTaskDefinition() {
246246
taskDefinition.setProjectCode(1L);
247247
taskDefinition.setCode(1L);
248248
taskDefinition.setVersion(1);
249+
taskDefinition.setTaskType(TaskType.SHELL.getDesc());
249250
return taskDefinition;
250251
}
251252

@@ -280,6 +281,40 @@ public void testCreateProcessTaskRelation() {
280281
processTaskRelationList.add(processTaskRelationLog);
281282
Mockito.when(processTaskRelationMapper.batchInsert(processTaskRelationList)).thenReturn(1);
282283
Mockito.when(processTaskRelationLogMapper.batchInsert(processTaskRelationList)).thenReturn(1);
284+
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
285+
}
286+
287+
@Test
288+
public void testMoveTaskProcessRelation() {
289+
long projectCode = 1L;
290+
long processDefinitionCode = 1L;
291+
long taskCode = 1L;
292+
293+
Project project = getProject(projectCode);
294+
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(project);
295+
296+
User loginUser = new User();
297+
loginUser.setId(-1);
298+
loginUser.setUserType(UserType.GENERAL_USER);
299+
300+
Map<String, Object> result = new HashMap<>();
301+
putMsg(result, Status.SUCCESS, projectCode);
302+
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode)).thenReturn(result);
303+
Mockito.when(processDefinitionMapper.queryByCode(processDefinitionCode)).thenReturn(getProcessDefinition());
304+
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L)).thenReturn(Lists.newArrayList());
305+
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(getTaskDefinition());
306+
List<ProcessTaskRelation> processTaskRelationList = Lists.newArrayList();
307+
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
308+
processTaskRelation.setProjectCode(projectCode);
309+
processTaskRelation.setProcessDefinitionCode(processDefinitionCode);
310+
processTaskRelation.setPreTaskCode(0L);
311+
processTaskRelation.setPreTaskVersion(0);
312+
processTaskRelation.setPostTaskCode(taskCode);
313+
processTaskRelation.setPostTaskVersion(1);
314+
processTaskRelationList.add(processTaskRelation);
315+
Mockito.when(processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode)).thenReturn(processTaskRelationList);
316+
Mockito.when(processTaskRelationMapper.updateById(processTaskRelation)).thenReturn(1);
317+
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
283318
}
284319

285320
@Test

0 commit comments

Comments
 (0)