|
17 | 17 |
|
18 | 18 | package org.apache.dolphinscheduler.api.service.impl; |
19 | 19 |
|
| 20 | +import static org.apache.dolphinscheduler.api.enums.Status.DATA_IS_NOT_VALID; |
| 21 | + |
20 | 22 | import org.apache.dolphinscheduler.api.enums.Status; |
21 | 23 | import org.apache.dolphinscheduler.api.exceptions.ServiceException; |
22 | 24 | import org.apache.dolphinscheduler.api.service.ProcessTaskRelationService; |
23 | 25 | import org.apache.dolphinscheduler.api.service.ProjectService; |
24 | 26 | import org.apache.dolphinscheduler.common.Constants; |
25 | 27 | import org.apache.dolphinscheduler.common.enums.TaskType; |
| 28 | +import org.apache.dolphinscheduler.common.utils.JSONUtils; |
26 | 29 | import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; |
27 | 30 | import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; |
28 | 31 | import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; |
|
42 | 45 |
|
43 | 46 | import java.util.ArrayList; |
44 | 47 | import java.util.Date; |
| 48 | +import java.util.HashSet; |
45 | 49 | import java.util.List; |
46 | 50 | import java.util.Map; |
47 | 51 | import java.util.Objects; |
|
52 | 56 | import org.springframework.stereotype.Service; |
53 | 57 | import org.springframework.transaction.annotation.Transactional; |
54 | 58 |
|
| 59 | +import com.fasterxml.jackson.databind.node.ArrayNode; |
| 60 | +import com.fasterxml.jackson.databind.node.ObjectNode; |
55 | 61 | import com.google.common.collect.Lists; |
56 | 62 |
|
57 | 63 | /** |
@@ -186,7 +192,85 @@ private ProcessTaskRelationLog setRelationLog(ProcessDefinition processDefinitio |
186 | 192 | */ |
187 | 193 | @Override |
188 | 194 | public Map<String, Object> moveTaskProcessRelation(User loginUser, long projectCode, long processDefinitionCode, long targetProcessDefinitionCode, long taskCode) { |
189 | | - return null; |
| 195 | + Project project = projectMapper.queryByCode(projectCode); |
| 196 | + //check user access for project |
| 197 | + Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode); |
| 198 | + if (result.get(Constants.STATUS) != Status.SUCCESS) { |
| 199 | + return result; |
| 200 | + } |
| 201 | + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(targetProcessDefinitionCode); |
| 202 | + if (processDefinition == null) { |
| 203 | + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, targetProcessDefinitionCode); |
| 204 | + return result; |
| 205 | + } |
| 206 | + if (processDefinition.getProjectCode() != projectCode) { |
| 207 | + putMsg(result, Status.PROJECT_PROCESS_NOT_MATCH); |
| 208 | + return result; |
| 209 | + } |
| 210 | + List<ProcessTaskRelation> downstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, taskCode, 0L); |
| 211 | + if (CollectionUtils.isNotEmpty(downstreamList)) { |
| 212 | + Set<Long> postTaskCodes = downstreamList |
| 213 | + .stream() |
| 214 | + .map(ProcessTaskRelation::getPostTaskCode) |
| 215 | + .collect(Collectors.toSet()); |
| 216 | + putMsg(result, Status.TASK_HAS_DOWNSTREAM, org.apache.commons.lang.StringUtils.join(postTaskCodes, ",")); |
| 217 | + return result; |
| 218 | + } |
| 219 | + List<ProcessTaskRelation> upstreamList = processTaskRelationMapper.queryByCode(projectCode, processDefinitionCode, 0L, taskCode); |
| 220 | + if (upstreamList.isEmpty()) { |
| 221 | + putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, "taskCode:" + taskCode); |
| 222 | + return result; |
| 223 | + } else { |
| 224 | + Set<Long> preTaskCodes = upstreamList |
| 225 | + .stream() |
| 226 | + .map(ProcessTaskRelation::getPreTaskCode) |
| 227 | + .collect(Collectors.toSet()); |
| 228 | + if (preTaskCodes.size() > 1 || !preTaskCodes.contains(0L)) { |
| 229 | + putMsg(result, Status.TASK_HAS_UPSTREAM, org.apache.commons.lang.StringUtils.join(preTaskCodes, ",")); |
| 230 | + return result; |
| 231 | + } |
| 232 | + } |
| 233 | + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); |
| 234 | + if (null == taskDefinition) { |
| 235 | + putMsg(result, Status.DATA_IS_NULL, "taskDefinition"); |
| 236 | + return result; |
| 237 | + } |
| 238 | + ObjectNode paramNode = JSONUtils.parseObject(taskDefinition.getTaskParams()); |
| 239 | + if (TaskType.DEPENDENT.getDesc().equals(taskDefinition.getTaskType())) { |
| 240 | + Set<Long> depProcessDefinitionCodes = new HashSet<>(); |
| 241 | + ObjectNode dependence = (ObjectNode) paramNode.get("dependence"); |
| 242 | + ArrayNode dependTaskList = JSONUtils.parseArray(JSONUtils.toJsonString(dependence.get("dependTaskList"))); |
| 243 | + for (int i = 0; i < dependTaskList.size(); i++) { |
| 244 | + ObjectNode dependTask = (ObjectNode) dependTaskList.path(i); |
| 245 | + ArrayNode dependItemList = JSONUtils.parseArray(JSONUtils.toJsonString(dependTask.get("dependItemList"))); |
| 246 | + for (int j = 0; j < dependItemList.size(); j++) { |
| 247 | + ObjectNode dependItem = (ObjectNode) dependItemList.path(j); |
| 248 | + long definitionCode = dependItem.get("definitionCode").asLong(); |
| 249 | + depProcessDefinitionCodes.add(definitionCode); |
| 250 | + } |
| 251 | + } |
| 252 | + if (depProcessDefinitionCodes.contains(targetProcessDefinitionCode)) { |
| 253 | + putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode"); |
| 254 | + return result; |
| 255 | + } |
| 256 | + } |
| 257 | + if (TaskType.SUB_PROCESS.getDesc().equals(taskDefinition.getTaskType())) { |
| 258 | + long subProcessDefinitionCode = paramNode.get("processDefinitionCode").asLong(); |
| 259 | + if (targetProcessDefinitionCode == subProcessDefinitionCode) { |
| 260 | + putMsg(result, DATA_IS_NOT_VALID, "targetProcessDefinitionCode"); |
| 261 | + return result; |
| 262 | + } |
| 263 | + } |
| 264 | + Date now = new Date(); |
| 265 | + ProcessTaskRelation processTaskRelation = upstreamList.get(0); |
| 266 | + processTaskRelation.setProcessDefinitionCode(processDefinition.getCode()); |
| 267 | + processTaskRelation.setProcessDefinitionVersion(processDefinition.getVersion()); |
| 268 | + processTaskRelation.setUpdateTime(now); |
| 269 | + int update = processTaskRelationMapper.updateById(processTaskRelation); |
| 270 | + if (update == 0) { |
| 271 | + putMsg(result, Status.MOVE_PROCESS_TASK_RELATION_ERROR); |
| 272 | + } |
| 273 | + return result; |
190 | 274 | } |
191 | 275 |
|
192 | 276 | /** |
|
0 commit comments