Skip to content

Commit 255e6a4

Browse files
authored
Merge c3c0103 into f69e064
2 parents f69e064 + c3c0103 commit 255e6a4

151 files changed

Lines changed: 1333 additions & 2647 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,14 @@
1919

2020
import org.apache.dolphinscheduler.api.metrics.ApiServerMetrics;
2121
import org.apache.dolphinscheduler.common.CommonConfiguration;
22-
import org.apache.dolphinscheduler.common.enums.PluginType;
2322
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
2423
import org.apache.dolphinscheduler.dao.DaoConfiguration;
2524
import org.apache.dolphinscheduler.dao.PluginDao;
26-
import org.apache.dolphinscheduler.dao.entity.PluginDefine;
2725
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
2826
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
29-
import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory;
3027
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
3128
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
3229
import org.apache.dolphinscheduler.service.ServiceConfiguration;
33-
import org.apache.dolphinscheduler.spi.params.PluginParamsTransfer;
34-
import org.apache.dolphinscheduler.spi.params.base.PluginParams;
35-
36-
import java.util.List;
37-
import java.util.Map;
3830

3931
import lombok.extern.slf4j.Slf4j;
4032

@@ -68,17 +60,7 @@ public static void main(String[] args) {
6860
@EventListener
6961
public void run(ApplicationReadyEvent readyEvent) {
7062
log.info("Received spring application context ready event will load taskPlugin and write to DB");
71-
// install task plugin
72-
TaskPluginManager.loadPlugin();
7363
DataSourceProcessorProvider.initialize();
74-
for (Map.Entry<String, TaskChannelFactory> entry : TaskPluginManager.getTaskChannelFactoryMap().entrySet()) {
75-
String taskPluginName = entry.getKey();
76-
TaskChannelFactory taskChannelFactory = entry.getValue();
77-
List<PluginParams> params = taskChannelFactory.getParams();
78-
String paramsJson = PluginParamsTransfer.transferParamsToJson(params);
79-
80-
PluginDefine pluginDefine = new PluginDefine(taskPluginName, PluginType.TASK.getDesc(), paramsJson);
81-
pluginDao.addOrUpdatePluginDefine(pluginDefine);
82-
}
64+
TaskPluginManager.loadTaskPlugin();
8365
}
8466
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@
8989
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerRequest;
9090
import org.apache.dolphinscheduler.extract.master.transportor.StreamingTaskTriggerResponse;
9191
import org.apache.dolphinscheduler.extract.master.transportor.WorkflowInstanceStateChangeEvent;
92-
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
9392
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
93+
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
9494
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
9595
import org.apache.dolphinscheduler.service.command.CommandService;
9696
import org.apache.dolphinscheduler.service.cron.CronUtils;
@@ -361,7 +361,7 @@ public boolean checkSubProcessDefinitionValid(ProcessDefinition processDefinitio
361361
// find out the process definition code
362362
Set<Long> processDefinitionCodeSet = new HashSet<>();
363363
taskDefinitions.stream()
364-
.filter(task -> TaskConstants.TASK_TYPE_SUB_PROCESS.equalsIgnoreCase(task.getTaskType())).forEach(
364+
.filter(task -> TaskTypeUtils.isSubWorkflowTask(task.getTaskType())).forEach(
365365
taskDefinition -> processDefinitionCodeSet.add(Long.valueOf(
366366
JSONUtils.getNodeString(taskDefinition.getTaskParams(),
367367
CMD_PARAM_SUB_PROCESS_DEFINE_CODE))));

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
4040
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LOCAL_PARAMS_LIST;
4141
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE;
42-
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SQL;
42+
import static org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager.checkTaskParameters;
4343

4444
import org.apache.dolphinscheduler.api.dto.DagDataSchedule;
4545
import org.apache.dolphinscheduler.api.dto.treeview.Instance;
@@ -109,12 +109,12 @@
109109
import org.apache.dolphinscheduler.dao.repository.ProcessDefinitionLogDao;
110110
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionLogDao;
111111
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
112-
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
113112
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
114113
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
115114
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
116-
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
117115
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
116+
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
117+
import org.apache.dolphinscheduler.plugin.task.sql.SqlTaskChannelFactory;
118118
import org.apache.dolphinscheduler.service.alert.ListenerEventAlertManager;
119119
import org.apache.dolphinscheduler.service.model.TaskNode;
120120
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -421,11 +421,7 @@ private List<TaskDefinitionLog> generateTaskDefinitionList(String taskDefinition
421421
throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJson);
422422
}
423423
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
424-
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
425-
.taskType(taskDefinitionLog.getTaskType())
426-
.taskParams(taskDefinitionLog.getTaskParams())
427-
.dependence(taskDefinitionLog.getDependence())
428-
.build())) {
424+
if (!checkTaskParameters(taskDefinitionLog.getTaskType(), taskDefinitionLog.getTaskParams())) {
429425
log.error(
430426
"Generate task definition list failed, the given task definition parameter is invalided, taskName: {}, taskDefinition: {}",
431427
taskDefinitionLog.getName(), taskDefinitionLog);
@@ -1386,7 +1382,7 @@ private TaskDefinitionLog buildNormalSqlTaskDefinition(String taskName, DataSour
13861382
sqlParameters.setLocalParams(Collections.emptyList());
13871383
taskDefinition.setTaskParams(JSONUtils.toJsonString(sqlParameters));
13881384
taskDefinition.setCode(CodeGenerateUtils.genCode());
1389-
taskDefinition.setTaskType(TASK_TYPE_SQL);
1385+
taskDefinition.setTaskType(SqlTaskChannelFactory.NAME);
13901386
taskDefinition.setFailRetryTimes(0);
13911387
taskDefinition.setFailRetryInterval(0);
13921388
taskDefinition.setTimeoutFlag(TimeoutFlag.CLOSE);
@@ -1615,13 +1611,7 @@ public Map<String, Object> checkProcessNodeList(String processTaskRelationJson,
16151611

16161612
// check whether the process definition json is normal
16171613
for (TaskNode taskNode : taskNodes) {
1618-
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
1619-
.taskType(taskNode.getType())
1620-
.taskParams(taskNode.getTaskParams())
1621-
.dependence(taskNode.getDependence())
1622-
.switchResult(taskNode.getSwitchResult())
1623-
.build())) {
1624-
log.error("Task node {} parameter invalid.", taskNode.getName());
1614+
if (!checkTaskParameters(taskNode.getType(), taskNode.getParams())) {
16251615
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName());
16261616
return result;
16271617
}
@@ -1891,7 +1881,7 @@ public Map<String, Object> viewTree(User loginUser, long projectCode, long code,
18911881

18921882
long subProcessCode = 0L;
18931883
// if process is sub process, the return sub id, or sub id=0
1894-
if (taskInstance.isSubProcess()) {
1884+
if (TaskTypeUtils.isSubWorkflowTask(taskInstance.getTaskType())) {
18951885
TaskDefinition taskDefinition = taskDefinitionMap.get(taskInstance.getTaskCode());
18961886
subProcessCode = Long.parseLong(JSONUtils.parseObject(
18971887
taskDefinition.getTaskParams()).path(CMD_PARAM_SUB_PROCESS_DEFINE_CODE).asText());

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
import static org.apache.dolphinscheduler.common.constants.Constants.LOCAL_PARAMS;
2727
import static org.apache.dolphinscheduler.common.constants.Constants.PROCESS_INSTANCE_STATE;
2828
import static org.apache.dolphinscheduler.common.constants.Constants.TASK_LIST;
29-
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
30-
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
29+
import static org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager.checkTaskParameters;
3130

3231
import org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant;
3332
import org.apache.dolphinscheduler.api.dto.DynamicSubWorkflowDto;
@@ -77,11 +76,10 @@
7776
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceMapDao;
7877
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
7978
import org.apache.dolphinscheduler.dao.utils.WorkflowUtils;
80-
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
8179
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
8280
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
83-
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
8481
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
82+
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
8583
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
8684
import org.apache.dolphinscheduler.service.model.TaskNode;
8785
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -498,7 +496,7 @@ public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUs
498496
throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
499497
}
500498

501-
if (!taskInstance.isDynamic()) {
499+
if (!TaskTypeUtils.isDynamicTask(taskInstance.getTaskType())) {
502500
putMsg(result, Status.TASK_INSTANCE_NOT_DYNAMIC_TASK, taskInstance.getName());
503501
throw new ServiceException(Status.TASK_INSTANCE_NOT_EXISTS, taskId);
504502
}
@@ -548,7 +546,7 @@ public List<DynamicSubWorkflowDto> queryDynamicSubWorkflowInstances(User loginUs
548546
*/
549547
private void addDependResultForTaskList(User loginUser, List<TaskInstance> taskInstanceList) throws IOException {
550548
for (TaskInstance taskInstance : taskInstanceList) {
551-
if (TASK_TYPE_DEPENDENT.equalsIgnoreCase(taskInstance.getTaskType())) {
549+
if (TaskTypeUtils.isDependentTask(taskInstance.getTaskType())) {
552550
log.info("DEPENDENT type task instance need to set dependent result, taskCode:{}, taskInstanceId:{}",
553551
taskInstance.getTaskCode(), taskInstance.getId());
554552
// TODO The result of dependent item should not be obtained from the log, waiting for optimization.
@@ -628,9 +626,7 @@ public Map<String, Object> querySubProcessInstanceByTaskId(User loginUser, long
628626
return result;
629627
}
630628

631-
if (!taskInstance.isSubProcess()) {
632-
log.warn("Task instance is not {} type instance, projectCode:{}, taskInstanceId:{}.",
633-
TASK_TYPE_SUB_PROCESS, projectCode, taskId);
629+
if (!TaskTypeUtils.isSubWorkflowTask(taskInstance.getTaskType())) {
634630
putMsg(result, Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE, taskInstance.getName());
635631
return result;
636632
}
@@ -714,11 +710,7 @@ public Map<String, Object> updateProcessInstance(User loginUser, long projectCod
714710
return result;
715711
}
716712
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
717-
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
718-
.taskType(taskDefinitionLog.getTaskType())
719-
.taskParams(taskDefinitionLog.getTaskParams())
720-
.dependence(taskDefinitionLog.getDependence())
721-
.build())) {
713+
if (!checkTaskParameters(taskDefinitionLog.getTaskType(), taskDefinitionLog.getTaskParams())) {
722714
log.error("Task parameters are invalid, taskDefinitionName:{}.", taskDefinitionLog.getName());
723715
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
724716
return result;

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessTaskRelationServiceImpl.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
package org.apache.dolphinscheduler.api.service.impl;
1919

2020
import static java.util.stream.Collectors.toSet;
21-
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_CONDITIONS;
22-
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
23-
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
2421

2522
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationCreateRequest;
2623
import org.apache.dolphinscheduler.api.dto.taskRelation.TaskRelationFilterRequest;
@@ -46,6 +43,7 @@
4643
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
4744
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
4845
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
46+
import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils;
4947
import org.apache.dolphinscheduler.service.process.ProcessService;
5048

5149
import org.apache.commons.collections4.CollectionUtils;
@@ -354,9 +352,9 @@ public Map<String, Object> deleteTaskProcessRelation(User loginUser, long projec
354352
}
355353
updateProcessDefiniteVersion(loginUser, result, processDefinition);
356354
updateRelation(loginUser, result, processDefinition, processTaskRelationList);
357-
if (TASK_TYPE_CONDITIONS.equals(taskDefinition.getTaskType())
358-
|| TASK_TYPE_DEPENDENT.equals(taskDefinition.getTaskType())
359-
|| TASK_TYPE_SUB_PROCESS.equals(taskDefinition.getTaskType())) {
355+
if (TaskTypeUtils.isConditionTask(taskDefinition.getTaskType())
356+
|| TaskTypeUtils.isSubWorkflowTask(taskDefinition.getTaskType())
357+
|| TaskTypeUtils.isDependentTask(taskDefinition.getTaskType())) {
360358
int deleteTaskDefinition = taskDefinitionMapper.deleteByCode(taskCode);
361359
if (0 == deleteTaskDefinition) {
362360
log.error("Delete task definition error, taskDefinitionCode:{}.", taskCode);

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_VERSION_VIEW;
2525
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;
2626
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION;
27+
import static org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager.checkTaskParameters;
2728

2829
import org.apache.dolphinscheduler.api.dto.task.TaskCreateRequest;
2930
import org.apache.dolphinscheduler.api.dto.task.TaskFilterRequest;
@@ -67,8 +68,6 @@
6768
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
6869
import org.apache.dolphinscheduler.dao.repository.ProcessTaskRelationLogDao;
6970
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
70-
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
71-
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
7271
import org.apache.dolphinscheduler.service.process.ProcessService;
7372

7473
import org.apache.commons.collections4.CollectionUtils;
@@ -167,11 +166,7 @@ public Map<String, Object> createTaskDefinition(User loginUser,
167166
return result;
168167
}
169168
for (TaskDefinitionLog taskDefinitionLog : taskDefinitionLogs) {
170-
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
171-
.taskType(taskDefinitionLog.getTaskType())
172-
.taskParams(taskDefinitionLog.getTaskParams())
173-
.dependence(taskDefinitionLog.getDependence())
174-
.build())) {
169+
if (!checkTaskParameters(taskDefinitionLog.getTaskType(), taskDefinitionLog.getTaskParams())) {
175170
log.warn("Task definition {} parameters are invalid.", taskDefinitionLog.getName());
176171
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
177172
return result;
@@ -208,11 +203,7 @@ private void checkTaskDefinitionValid(User user, TaskDefinition taskDefinition,
208203
Project project = projectMapper.queryByCode(taskDefinition.getProjectCode());
209204
projectService.checkProjectAndAuthThrowException(user, project, permissions);
210205

211-
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
212-
.taskType(taskDefinition.getTaskType())
213-
.taskParams(taskDefinition.getTaskParams())
214-
.dependence(taskDefinition.getDependence())
215-
.build())) {
206+
if (!checkTaskParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams())) {
216207
throw new ServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName());
217208
}
218209
}
@@ -321,12 +312,7 @@ public Map<String, Object> createTaskBindsWorkFlow(User loginUser,
321312
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
322313
return result;
323314
}
324-
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
325-
.taskType(taskDefinition.getTaskType())
326-
.taskParams(taskDefinition.getTaskParams())
327-
.dependence(taskDefinition.getDependence())
328-
.build())) {
329-
log.error("Task definition {} parameters are invalid", taskDefinition.getName());
315+
if (!checkTaskParameters(taskDefinition.getTaskType(), taskDefinition.getTaskParams())) {
330316
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName());
331317
return result;
332318
}
@@ -732,13 +718,7 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task
732718
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
733719
return null;
734720
}
735-
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
736-
.taskType(taskDefinitionToUpdate.getTaskType())
737-
.taskParams(taskDefinitionToUpdate.getTaskParams())
738-
.dependence(taskDefinitionToUpdate.getDependence())
739-
.build())) {
740-
log.warn("Task definition parameters are invalid, taskDefinitionName:{}.",
741-
taskDefinitionToUpdate.getName());
721+
if (!checkTaskParameters(taskDefinitionToUpdate.getTaskType(), taskDefinitionToUpdate.getTaskParams())) {
742722
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName());
743723
return null;
744724
}

0 commit comments

Comments
 (0)