Skip to content

Commit aa8b88a

Browse files
authored
[Feature-10871] add workflow executing data query (#10875)
* add workflow executing data query * fix sonar check for interrupted
1 parent 553159f commit aa8b88a

16 files changed

Lines changed: 719 additions & 76 deletions

File tree

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.dolphinscheduler.api.enums.Status.BATCH_EXECUTE_PROCESS_INSTANCE_ERROR;
2121
import static org.apache.dolphinscheduler.api.enums.Status.CHECK_PROCESS_DEFINITION_ERROR;
2222
import static org.apache.dolphinscheduler.api.enums.Status.EXECUTE_PROCESS_INSTANCE_ERROR;
23+
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_EXECUTING_WORKFLOW_ERROR;
2324
import static org.apache.dolphinscheduler.api.enums.Status.START_PROCESS_INSTANCE_ERROR;
2425

2526
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
@@ -38,11 +39,23 @@
3839
import org.apache.dolphinscheduler.common.enums.WarningType;
3940
import org.apache.dolphinscheduler.common.utils.JSONUtils;
4041
import org.apache.dolphinscheduler.dao.entity.User;
42+
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
43+
44+
import org.apache.commons.lang3.StringUtils;
45+
46+
import java.text.MessageFormat;
47+
import java.util.ArrayList;
48+
import java.util.Arrays;
49+
import java.util.HashMap;
50+
import java.util.List;
51+
import java.util.Map;
52+
import java.util.stream.Collectors;
4153

4254
import org.slf4j.Logger;
4355
import org.slf4j.LoggerFactory;
4456
import org.springframework.beans.factory.annotation.Autowired;
4557
import org.springframework.http.HttpStatus;
58+
import org.springframework.web.bind.annotation.GetMapping;
4659
import org.springframework.web.bind.annotation.PathVariable;
4760
import org.springframework.web.bind.annotation.PostMapping;
4861
import org.springframework.web.bind.annotation.RequestAttribute;
@@ -58,16 +71,6 @@
5871
import io.swagger.annotations.ApiParam;
5972
import springfox.documentation.annotations.ApiIgnore;
6073

61-
import org.apache.commons.lang3.StringUtils;
62-
63-
import java.text.MessageFormat;
64-
import java.util.ArrayList;
65-
import java.util.Arrays;
66-
import java.util.HashMap;
67-
import java.util.List;
68-
import java.util.Map;
69-
import java.util.stream.Collectors;
70-
7174
/**
7275
* executor controller
7376
*/
@@ -361,4 +364,20 @@ public Result startCheckProcessDefinition(@RequestParam(value = "processDefiniti
361364
Map<String, Object> result = execService.startCheckByProcessDefinedCode(processDefinitionCode);
362365
return returnDataList(result);
363366
}
367+
368+
/**
369+
* query execute data of processInstance from master
370+
*/
371+
@ApiOperation(value = "queryExecutingWorkflow", notes = "QUERY_WORKFLOW_EXECUTE_DATA")
372+
@ApiImplicitParams({
373+
@ApiImplicitParam(name = "processInstanceId", value = "PROCESS_INSTANCE_ID", required = true, dataType = "Int", example = "100")
374+
})
375+
@GetMapping(value = "/query-executing-workflow")
376+
@ResponseStatus(HttpStatus.OK)
377+
@ApiException(QUERY_EXECUTING_WORKFLOW_ERROR)
378+
@AccessLogAnnotation
379+
public Result queryExecutingWorkflow(@RequestParam("id") Integer processInstanceId) {
380+
WorkflowExecuteDto workflowExecuteDto = execService.queryExecutingWorkflowByProcessInstanceId(processInstanceId);
381+
return Result.success(workflowExecuteDto);
382+
}
364383
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,17 @@
1717

1818
package org.apache.dolphinscheduler.api.controller;
1919

20-
import java.io.IOException;
21-
import java.text.MessageFormat;
22-
import java.util.ArrayList;
23-
import java.util.HashMap;
24-
import java.util.List;
25-
import java.util.Map;
20+
import static org.apache.dolphinscheduler.api.enums.Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR;
21+
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR;
22+
import static org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR;
23+
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PARENT_PROCESS_INSTANCE_DETAIL_INFO_BY_SUB_PROCESS_INSTANCE_ID_ERROR;
24+
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR;
25+
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_BY_ID_ERROR;
26+
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR;
27+
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR;
28+
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_ERROR;
29+
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_PROCESS_INSTANCE_ERROR;
2630

27-
import io.swagger.annotations.Api;
28-
import io.swagger.annotations.ApiImplicitParam;
29-
import io.swagger.annotations.ApiImplicitParams;
30-
import io.swagger.annotations.ApiOperation;
31-
import io.swagger.annotations.ApiParam;
32-
import org.apache.commons.lang3.StringUtils;
3331
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
3432
import org.apache.dolphinscheduler.api.enums.Status;
3533
import org.apache.dolphinscheduler.api.exceptions.ApiException;
@@ -40,6 +38,16 @@
4038
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
4139
import org.apache.dolphinscheduler.dao.entity.User;
4240
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
41+
42+
import org.apache.commons.lang3.StringUtils;
43+
44+
import java.io.IOException;
45+
import java.text.MessageFormat;
46+
import java.util.ArrayList;
47+
import java.util.HashMap;
48+
import java.util.List;
49+
import java.util.Map;
50+
4351
import org.slf4j.Logger;
4452
import org.slf4j.LoggerFactory;
4553
import org.springframework.beans.factory.annotation.Autowired;
@@ -54,17 +62,13 @@
5462
import org.springframework.web.bind.annotation.RequestParam;
5563
import org.springframework.web.bind.annotation.ResponseStatus;
5664
import org.springframework.web.bind.annotation.RestController;
65+
66+
import io.swagger.annotations.Api;
67+
import io.swagger.annotations.ApiImplicitParam;
68+
import io.swagger.annotations.ApiImplicitParams;
69+
import io.swagger.annotations.ApiOperation;
70+
import io.swagger.annotations.ApiParam;
5771
import springfox.documentation.annotations.ApiIgnore;
58-
import static org.apache.dolphinscheduler.api.enums.Status.BATCH_DELETE_PROCESS_INSTANCE_BY_IDS_ERROR;
59-
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_PROCESS_INSTANCE_BY_ID_ERROR;
60-
import static org.apache.dolphinscheduler.api.enums.Status.ENCAPSULATION_PROCESS_INSTANCE_GANTT_STRUCTURE_ERROR;
61-
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PARENT_PROCESS_INSTANCE_DETAIL_INFO_BY_SUB_PROCESS_INSTANCE_ID_ERROR;
62-
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR;
63-
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_BY_ID_ERROR;
64-
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_PROCESS_INSTANCE_LIST_PAGING_ERROR;
65-
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_SUB_PROCESS_INSTANCE_DETAIL_INFO_BY_TASK_ID_ERROR;
66-
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_LIST_BY_PROCESS_INSTANCE_ID_ERROR;
67-
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_PROCESS_INSTANCE_ERROR;
6872

6973
/**
7074
* process instance controller

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ public enum Status {
216216
QUERY_AUTHORIZED_USER(10183, "query authorized user error", "查询拥有项目权限的用户错误"),
217217
PROJECT_NOT_EXIST(10190, "This project was not found. Please refresh page.", "该项目不存在,请刷新页面"),
218218
TASK_INSTANCE_HOST_IS_NULL(10191, "task instance host is null", "任务实例host为空"),
219+
QUERY_EXECUTING_WORKFLOW_ERROR(10192, "query executing workflow error", "查询运行的工作流实例错误"),
219220

220221
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
221222
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.dolphinscheduler.common.enums.WarningType;
2828
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
2929
import org.apache.dolphinscheduler.dao.entity.User;
30+
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
3031

3132
import java.util.Map;
3233

@@ -111,4 +112,11 @@ Map<String, Object> execProcessInstance(User loginUser, long projectCode,
111112
* @return
112113
*/
113114
Map<String, Object> forceStartTaskInstance(User loginUser, int queueId);
115+
116+
/**
117+
* query executing workflow data in Master memory
118+
* @param processInstanceId
119+
* @return
120+
*/
121+
WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId);
114122
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@
6969
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
7070
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
7171
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
72+
import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataRequestCommand;
73+
import org.apache.dolphinscheduler.remote.command.WorkflowExecutingDataResponseCommand;
74+
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
7275
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
7376
import org.apache.dolphinscheduler.remote.utils.Host;
7477
import org.apache.dolphinscheduler.service.cron.CronUtils;
@@ -991,4 +994,26 @@ private String removeDuplicates(String scheduleTimeList) {
991994
}
992995
return null;
993996
}
997+
998+
/**
999+
* query executing data of processInstance by master
1000+
* @param processInstanceId
1001+
* @return
1002+
*/
1003+
@Override
1004+
public WorkflowExecuteDto queryExecutingWorkflowByProcessInstanceId(Integer processInstanceId) {
1005+
ProcessInstance processInstance = processService.findProcessInstanceDetailById(processInstanceId);
1006+
if (processInstance == null) {
1007+
return null;
1008+
}
1009+
Host host = new Host(processInstance.getHost());
1010+
WorkflowExecutingDataRequestCommand requestCommand = new WorkflowExecutingDataRequestCommand();
1011+
requestCommand.setProcessInstanceId(processInstanceId);
1012+
org.apache.dolphinscheduler.remote.command.Command command = stateEventCallbackService.sendSync(host, requestCommand.convert2Command());
1013+
if (command == null) {
1014+
return null;
1015+
}
1016+
WorkflowExecutingDataResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), WorkflowExecutingDataResponseCommand.class);
1017+
return responseCommand.getWorkflowExecuteDto();
1018+
}
9941019
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,17 @@
1717

1818
package org.apache.dolphinscheduler.api.service.impl;
1919

20-
import java.io.BufferedReader;
21-
import java.io.ByteArrayInputStream;
22-
import java.io.IOException;
23-
import java.io.InputStreamReader;
24-
import java.nio.charset.StandardCharsets;
25-
import java.util.ArrayList;
26-
import java.util.Collections;
27-
import java.util.Date;
28-
import java.util.HashMap;
29-
import java.util.List;
30-
import java.util.Map;
31-
import java.util.Objects;
32-
import java.util.function.Function;
33-
import java.util.stream.Collectors;
34-
35-
import com.baomidou.mybatisplus.core.metadata.IPage;
36-
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
20+
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
21+
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
22+
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
23+
import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
24+
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
25+
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
26+
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
27+
import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
28+
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
29+
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
3730

38-
import org.apache.commons.collections.CollectionUtils;
39-
import org.apache.commons.lang3.StringUtils;
4031
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
4132
import org.apache.dolphinscheduler.api.dto.gantt.Task;
4233
import org.apache.dolphinscheduler.api.enums.Status;
@@ -51,7 +42,6 @@
5142
import org.apache.dolphinscheduler.api.utils.Result;
5243
import org.apache.dolphinscheduler.common.Constants;
5344
import org.apache.dolphinscheduler.common.enums.Flag;
54-
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
5545
import org.apache.dolphinscheduler.common.graph.DAG;
5646
import org.apache.dolphinscheduler.common.model.TaskNode;
5747
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -81,21 +71,34 @@
8171
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
8272
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
8373
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
74+
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
8475
import org.apache.dolphinscheduler.service.process.ProcessService;
8576
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
77+
78+
import org.apache.commons.collections.CollectionUtils;
79+
import org.apache.commons.lang3.StringUtils;
80+
81+
import java.io.BufferedReader;
82+
import java.io.ByteArrayInputStream;
83+
import java.io.IOException;
84+
import java.io.InputStreamReader;
85+
import java.nio.charset.StandardCharsets;
86+
import java.util.ArrayList;
87+
import java.util.Collections;
88+
import java.util.Date;
89+
import java.util.HashMap;
90+
import java.util.List;
91+
import java.util.Map;
92+
import java.util.Objects;
93+
import java.util.function.Function;
94+
import java.util.stream.Collectors;
95+
8696
import org.springframework.beans.factory.annotation.Autowired;
8797
import org.springframework.stereotype.Service;
8898
import org.springframework.transaction.annotation.Transactional;
89-
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
90-
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
91-
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_INSTANCE;
92-
import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
93-
import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
94-
import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
95-
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
96-
import static org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
97-
import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
98-
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
99+
100+
import com.baomidou.mybatisplus.core.metadata.IPage;
101+
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
99102

100103
/**
101104
* process instance service impl
@@ -459,7 +462,7 @@ public Map<String, Object> updateProcessInstance(User loginUser, long projectCod
459462
String locations, int timeout, String tenantCode) {
460463
Project project = projectMapper.queryByCode(projectCode);
461464
//check user access for project
462-
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,INSTANCE_UPDATE );
465+
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode,INSTANCE_UPDATE);
463466
if (result.get(Constants.STATUS) != Status.SUCCESS) {
464467
return result;
465468
}
@@ -833,5 +836,4 @@ public List<ProcessInstance> queryByProcessDefineCodeAndStatus(Long processDefin
833836
public List<ProcessInstance> queryByProcessDefineCode(Long processDefinitionCode, int size) {
834837
return processInstanceMapper.queryByProcessDefineCode(processDefinitionCode, size);
835838
}
836-
837839
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.controller;
19+
20+
import org.apache.dolphinscheduler.remote.dto.WorkflowExecuteDto;
21+
import org.apache.dolphinscheduler.server.master.service.ExecutingService;
22+
23+
import java.util.Optional;
24+
25+
import org.springframework.beans.factory.annotation.Autowired;
26+
import org.springframework.http.HttpStatus;
27+
import org.springframework.web.bind.annotation.GetMapping;
28+
import org.springframework.web.bind.annotation.RequestMapping;
29+
import org.springframework.web.bind.annotation.RequestParam;
30+
import org.springframework.web.bind.annotation.ResponseStatus;
31+
import org.springframework.web.bind.annotation.RestController;
32+
33+
@RestController
34+
@RequestMapping("/workflow/execute")
35+
public class WorkflowExecuteController {
36+
37+
@Autowired
38+
private ExecutingService executingService;
39+
40+
/**
41+
* query workflow execute data in memory
42+
* @param processInstanceId
43+
* @return
44+
*/
45+
@GetMapping("")
46+
@ResponseStatus(HttpStatus.OK)
47+
public WorkflowExecuteDto queryExecuteData(@RequestParam("id") int processInstanceId) {
48+
Optional<WorkflowExecuteDto> workflowExecuteDtoOptional = executingService.queryWorkflowExecutingData(processInstanceId);
49+
return workflowExecuteDtoOptional.orElse(null);
50+
}
51+
}

0 commit comments

Comments
 (0)