Skip to content

Commit d879bfe

Browse files
committed
Fix database error may cause workflow dead with running status
1 parent dd7d41f commit d879bfe

4 files changed

Lines changed: 136 additions & 72 deletions

File tree

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/StateEventType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
public enum StateEventType {
2323

24-
PROCESS_STATE_CHANGE(0, "process statechange"),
24+
PROCESS_STATE_CHANGE(0, "process state change"),
2525
TASK_STATE_CHANGE(1, "task state change"),
2626
PROCESS_TIMEOUT(2, "process timeout"),
2727
TASK_TIMEOUT(3, "task timeout"),

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

Lines changed: 113 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,9 @@ public boolean isStart() {
255255
*/
256256
public void handleEvents() {
257257
if (!isStart()) {
258+
logger.info(
259+
"The workflow instance is not started, will not handle its state event, current state event size: {}",
260+
stateEvents);
258261
return;
259262
}
260263
StateEvent stateEvent = null;
@@ -358,45 +361,53 @@ public void taskTimeout(TaskInstance taskInstance) {
358361
}
359362

360363
public void taskFinished(TaskInstance taskInstance) throws StateEventHandleException {
361-
logger.info("TaskInstance finished task code:{} state:{} ",
362-
taskInstance.getTaskCode(),
363-
taskInstance.getState());
364-
365-
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
366-
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
367-
stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
368-
stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);
369-
370-
if (taskInstance.getState().typeIsSuccess()) {
371-
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
372-
// todo: merge the last taskInstance
373-
processInstance.setVarPool(taskInstance.getVarPool());
374-
processService.saveProcessInstance(processInstance);
375-
if (!processInstance.isBlocked()) {
376-
submitPostNode(Long.toString(taskInstance.getTaskCode()));
377-
}
378-
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
379-
// retry task
380-
logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
381-
retryTaskInstance(taskInstance);
382-
} else if (taskInstance.getState().typeIsFailure()) {
383-
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
384-
// There are child nodes and the failure policy is: CONTINUE
385-
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE
386-
&& DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
387-
submitPostNode(Long.toString(taskInstance.getTaskCode()));
388-
} else {
389-
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
390-
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
391-
killAllTasks();
364+
logger.info("TaskInstance finished task code:{} state:{}", taskInstance.getTaskCode(), taskInstance.getState());
365+
try {
366+
367+
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
368+
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
369+
stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance);
370+
stateWheelExecuteThread.removeTask4StateCheck(processInstance, taskInstance);
371+
372+
if (taskInstance.getState().typeIsSuccess()) {
373+
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
374+
// todo: merge the last taskInstance
375+
processInstance.setVarPool(taskInstance.getVarPool());
376+
processService.saveProcessInstance(processInstance);
377+
if (!processInstance.isBlocked()) {
378+
submitPostNode(Long.toString(taskInstance.getTaskCode()));
392379
}
380+
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
381+
// retry task
382+
logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
383+
retryTaskInstance(taskInstance);
384+
} else if (taskInstance.getState().typeIsFailure()) {
385+
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
386+
// There are child nodes and the failure policy is: CONTINUE
387+
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
388+
Long.toString(taskInstance.getTaskCode()),
389+
dag)) {
390+
submitPostNode(Long.toString(taskInstance.getTaskCode()));
391+
} else {
392+
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
393+
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
394+
killAllTasks();
395+
}
396+
}
397+
} else if (taskInstance.getState().typeIsFinished()) {
398+
// todo: when the task instance type is pause, then it should not in completeTaskMap
399+
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
393400
}
394-
} else if (taskInstance.getState().typeIsFinished()) {
395-
// todo: when the task instance type is pause, then it should not in completeTaskMap
396-
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
401+
logger.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}",
402+
taskInstance.getTaskCode(),
403+
taskInstance.getState());
404+
this.updateProcessInstanceState();
405+
} catch (Exception ex) {
406+
logger.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex);
407+
// remove the task from complete map, so that we can finish in the next time.
408+
completeTaskMap.remove(taskInstance.getTaskCode());
409+
throw ex;
397410
}
398-
399-
this.updateProcessInstanceState();
400411
}
401412

402413
/**
@@ -651,14 +662,19 @@ public WorkflowSubmitStatue call() {
651662
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
652663
if (workflowRunnableStatus == WorkflowRunnableStatus.CREATED) {
653664
buildFlowDag();
665+
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
666+
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
654667
}
655668
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_DAG) {
656669
initTaskQueue();
670+
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
671+
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
657672
}
658673
if (workflowRunnableStatus == WorkflowRunnableStatus.INITIALIZE_QUEUE) {
659674
submitPostNode(null);
675+
workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
676+
logger.info("workflowStatue changed to :{}", workflowRunnableStatus);
660677
}
661-
workflowRunnableStatus = WorkflowRunnableStatus.STARTED;
662678
return WorkflowSubmitStatue.SUCCESS;
663679
} catch (Exception e) {
664680
logger.error("Start workflow error", e);
@@ -751,7 +767,6 @@ private void buildFlowDag() throws Exception {
751767
}
752768
// generate process dag
753769
dag = DagHelper.buildDagGraph(processDag);
754-
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_DAG;
755770
logger.info("Build dag success, dag: {}", dag);
756771
}
757772

@@ -859,7 +874,6 @@ private void initTaskQueue() throws StateEventHandleException, CronParseExceptio
859874
}
860875
}
861876
}
862-
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
863877
logger.info("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}",
864878
dependFailedTaskMap,
865879
completeTaskMap,
@@ -886,7 +900,11 @@ private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
886900

887901
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
888902
if (!submit) {
889-
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!", processInstance.getId(), processInstance.getName(), taskInstance.getId(), taskInstance.getName());
903+
logger.error("process id:{} name:{} submit standby task id:{} name:{} failed!",
904+
processInstance.getId(),
905+
processInstance.getName(),
906+
taskInstance.getId(),
907+
taskInstance.getName());
890908
return Optional.empty();
891909
}
892910

@@ -937,7 +955,10 @@ private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
937955
}
938956
return Optional.of(taskInstance);
939957
} catch (Exception e) {
940-
logger.error("submit standby task error", e);
958+
logger.error("submit standby task error, taskCode: {}, taskInstanceId: {}",
959+
taskInstance.getTaskCode(),
960+
taskInstance.getId(),
961+
e);
941962
return Optional.empty();
942963
}
943964
}
@@ -1190,9 +1211,19 @@ private void setVarPoolValue(Map<String, Property> allProperty, Map<String, Task
11901211
*/
11911212
private Map<String, TaskInstance> getCompleteTaskInstanceMap() {
11921213
Map<String, TaskInstance> completeTaskInstanceMap = new HashMap<>();
1193-
for (Integer taskInstanceId : completeTaskMap.values()) {
1214+
for (Map.Entry<Long, Integer> entry : completeTaskMap.entrySet()) {
1215+
Long taskConde = entry.getKey();
1216+
Integer taskInstanceId = entry.getValue();
11941217
TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId);
1218+
if (taskInstance == null) {
1219+
logger.warn("Cannot find the taskInstance from taskInstanceMap, taskInstanceId: {}, taskConde: {}",
1220+
taskInstanceId,
1221+
taskConde);
1222+
// This case will happen when we submit to db failed, then the taskInstanceId is 0
1223+
continue;
1224+
}
11951225
completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance);
1226+
11961227
}
11971228
return completeTaskInstanceMap;
11981229
}
@@ -1260,7 +1291,6 @@ private void submitPostNode(String parentNodeCode) throws StateEventHandleExcept
12601291
}
12611292
submitStandByTask();
12621293
updateProcessInstanceState();
1263-
workflowRunnableStatus = WorkflowRunnableStatus.INITIALIZE_QUEUE;
12641294
}
12651295

12661296
/**
@@ -1405,6 +1435,7 @@ private boolean hasFailedTask() {
14051435
*/
14061436
private boolean processFailed() {
14071437
if (hasFailedTask()) {
1438+
logger.info("The current process has failed task, the current process failed");
14081439
if (processInstance.getFailureStrategy() == FailureStrategy.END) {
14091440
return true;
14101441
}
@@ -1480,38 +1511,48 @@ private ExecutionStatus getProcessInstanceState(ProcessInstance instance) {
14801511

14811512
if (activeTaskProcessorMaps.size() > 0 || hasRetryTaskInStandBy()) {
14821513
// active task and retry task exists
1483-
return runningState(state);
1514+
ExecutionStatus executionStatus = runningState(state);
1515+
logger.info("The workflowInstance has task running, the workflowInstance status is {}", executionStatus);
1516+
return executionStatus;
14841517
}
14851518

14861519
// block
14871520
if (state == ExecutionStatus.READY_BLOCK) {
1488-
return processReadyBlock();
1521+
ExecutionStatus executionStatus = processReadyBlock();
1522+
logger.info("The workflowInstance is ready to block, the workflowInstance status is {}", executionStatus);
14891523
}
14901524

14911525
// waiting thread
14921526
if (hasWaitingThreadTask()) {
1527+
logger.info("The workflowInstance has waiting thread task, the workflow status is {}",
1528+
ExecutionStatus.WAITING_THREAD);
14931529
return ExecutionStatus.WAITING_THREAD;
14941530
}
14951531

14961532
// pause
14971533
if (state == ExecutionStatus.READY_PAUSE) {
1498-
return processReadyPause();
1534+
ExecutionStatus executionStatus = processReadyPause();
1535+
logger.info("The workflowInstance is ready to pause, the workflow status is {}", executionStatus);
1536+
return executionStatus;
14991537
}
15001538

15011539
// stop
15021540
if (state == ExecutionStatus.READY_STOP) {
15031541
List<TaskInstance> stopList = getCompleteTaskByState(ExecutionStatus.STOP);
15041542
List<TaskInstance> killList = getCompleteTaskByState(ExecutionStatus.KILL);
15051543
List<TaskInstance> failList = getCompleteTaskByState(ExecutionStatus.FAILURE);
1544+
ExecutionStatus executionStatus;
15061545
if (CollectionUtils.isNotEmpty(stopList) || CollectionUtils.isNotEmpty(killList) || CollectionUtils.isNotEmpty(failList) || !isComplementEnd()) {
1507-
return ExecutionStatus.STOP;
1546+
executionStatus = ExecutionStatus.STOP;
15081547
} else {
1509-
return ExecutionStatus.SUCCESS;
1548+
executionStatus = ExecutionStatus.SUCCESS;
15101549
}
1550+
logger.info("The workflowInstance is ready to stop, the workflow status is {}", executionStatus);
15111551
}
15121552

15131553
// process failure
15141554
if (processFailed()) {
1555+
logger.info("The workflowInstance is failed, the workflow status is {}", ExecutionStatus.FAILURE);
15151556
return ExecutionStatus.FAILURE;
15161557
}
15171558

@@ -1555,15 +1596,21 @@ private boolean isComplementEnd() {
15551596
private void updateProcessInstanceState() throws StateEventHandleException {
15561597
ExecutionStatus state = getProcessInstanceState(processInstance);
15571598
if (processInstance.getState() != state) {
1599+
logger.info("Update workflowInstance states, origin state: {}, target state: {}",
1600+
processInstance.getState(),
1601+
state);
15581602
updateWorkflowInstanceStatesToDB(state);
15591603

15601604
StateEvent stateEvent = new StateEvent();
15611605
stateEvent.setExecutionStatus(processInstance.getState());
15621606
stateEvent.setProcessInstanceId(this.processInstance.getId());
15631607
stateEvent.setType(StateEventType.PROCESS_STATE_CHANGE);
1564-
// this.processStateChangeHandler(stateEvent);
15651608
// replace with `stateEvents`, make sure `WorkflowExecuteThread` can be deleted to avoid memory leaks
15661609
this.stateEvents.add(stateEvent);
1610+
} else {
1611+
logger.info("There is no need to update the workflow instance state, origin state: {}, target state: {}",
1612+
processInstance.getState(),
1613+
state);
15671614
}
15681615
}
15691616

@@ -1578,12 +1625,9 @@ public void updateProcessInstanceState(StateEvent stateEvent) throws StateEventH
15781625
private void updateWorkflowInstanceStatesToDB(ExecutionStatus newStates) throws StateEventHandleException {
15791626
ExecutionStatus originStates = processInstance.getState();
15801627
if (originStates != newStates) {
1581-
logger.info("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}",
1582-
processInstance.getId(),
1583-
processInstance.getName(),
1584-
originStates,
1585-
newStates,
1586-
processInstance.getCommandType());
1628+
logger.info("Begin to update workflow instance state , state will change from {} to {}",
1629+
originStates,
1630+
newStates);
15871631

15881632
processInstance.setState(newStates);
15891633
if (newStates.typeIsFinished()) {
@@ -1633,8 +1677,8 @@ public void addTaskToStandByList(TaskInstance taskInstance) {
16331677
*
16341678
* @param taskInstance task instance
16351679
*/
1636-
private void removeTaskFromStandbyList(TaskInstance taskInstance) {
1637-
readyToSubmitTaskQueue.remove(taskInstance);
1680+
private boolean removeTaskFromStandbyList(TaskInstance taskInstance) {
1681+
return readyToSubmitTaskQueue.remove(taskInstance);
16381682
}
16391683

16401684
/**
@@ -1724,10 +1768,20 @@ public void submitStandByTask() throws StateEventHandleException {
17241768
if (!taskInstanceOptional.isPresent()) {
17251769
this.taskFailedSubmit = true;
17261770
// Remove and add to complete map and error map
1727-
removeTaskFromStandbyList(task);
1771+
if (!removeTaskFromStandbyList(task)) {
1772+
logger.error(
1773+
"Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}",
1774+
processInstance.getId(),
1775+
task.getTaskCode());
1776+
}
17281777
completeTaskMap.put(task.getTaskCode(), task.getId());
1778+
taskInstanceMap.put(task.getId(), task);
17291779
errorTaskMap.put(task.getTaskCode(), task.getId());
1730-
logger.error("Task submitted failed, processInstanceId: {}, taskInstanceId: {}", task.getProcessInstanceId(), task.getId());
1780+
activeTaskProcessorMaps.remove(task.getTaskCode());
1781+
logger.error("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}",
1782+
task.getProcessInstanceId(),
1783+
task.getId(),
1784+
task.getTaskCode());
17311785
} else {
17321786
removeTaskFromStandbyList(task);
17331787
}

0 commit comments

Comments
 (0)