@@ -255,6 +255,9 @@ public boolean isStart() {
255255 */
256256 public void handleEvents () {
257257 if (!isStart ()) {
258+ logger .info (
259+ "The workflow instance is not started, will not handle its state event, current state event size: {}" ,
260+ stateEvents );
258261 return ;
259262 }
260263 StateEvent stateEvent = null ;
@@ -358,45 +361,53 @@ public void taskTimeout(TaskInstance taskInstance) {
358361 }
359362
360363 public void taskFinished (TaskInstance taskInstance ) throws StateEventHandleException {
361- logger .info ("TaskInstance finished task code:{} state:{} " ,
362- taskInstance .getTaskCode (),
363- taskInstance .getState ());
364-
365- activeTaskProcessorMaps .remove (taskInstance .getTaskCode ());
366- stateWheelExecuteThread .removeTask4TimeoutCheck (processInstance , taskInstance );
367- stateWheelExecuteThread .removeTask4RetryCheck (processInstance , taskInstance );
368- stateWheelExecuteThread .removeTask4StateCheck (processInstance , taskInstance );
369-
370- if (taskInstance .getState ().typeIsSuccess ()) {
371- completeTaskMap .put (taskInstance .getTaskCode (), taskInstance .getId ());
372- // todo: merge the last taskInstance
373- processInstance .setVarPool (taskInstance .getVarPool ());
374- processService .saveProcessInstance (processInstance );
375- if (!processInstance .isBlocked ()) {
376- submitPostNode (Long .toString (taskInstance .getTaskCode ()));
377- }
378- } else if (taskInstance .taskCanRetry () && processInstance .getState () != ExecutionStatus .READY_STOP ) {
379- // retry task
380- logger .info ("Retry taskInstance taskInstance state: {}" , taskInstance .getState ());
381- retryTaskInstance (taskInstance );
382- } else if (taskInstance .getState ().typeIsFailure ()) {
383- completeTaskMap .put (taskInstance .getTaskCode (), taskInstance .getId ());
384- // There are child nodes and the failure policy is: CONTINUE
385- if (processInstance .getFailureStrategy () == FailureStrategy .CONTINUE
386- && DagHelper .haveAllNodeAfterNode (Long .toString (taskInstance .getTaskCode ()), dag )) {
387- submitPostNode (Long .toString (taskInstance .getTaskCode ()));
388- } else {
389- errorTaskMap .put (taskInstance .getTaskCode (), taskInstance .getId ());
390- if (processInstance .getFailureStrategy () == FailureStrategy .END ) {
391- killAllTasks ();
364+ logger .info ("TaskInstance finished task code:{} state:{}" , taskInstance .getTaskCode (), taskInstance .getState ());
365+ try {
366+
367+ activeTaskProcessorMaps .remove (taskInstance .getTaskCode ());
368+ stateWheelExecuteThread .removeTask4TimeoutCheck (processInstance , taskInstance );
369+ stateWheelExecuteThread .removeTask4RetryCheck (processInstance , taskInstance );
370+ stateWheelExecuteThread .removeTask4StateCheck (processInstance , taskInstance );
371+
372+ if (taskInstance .getState ().typeIsSuccess ()) {
373+ completeTaskMap .put (taskInstance .getTaskCode (), taskInstance .getId ());
374+ // todo: merge the last taskInstance
375+ processInstance .setVarPool (taskInstance .getVarPool ());
376+ processService .saveProcessInstance (processInstance );
377+ if (!processInstance .isBlocked ()) {
378+ submitPostNode (Long .toString (taskInstance .getTaskCode ()));
392379 }
380+ } else if (taskInstance .taskCanRetry () && processInstance .getState () != ExecutionStatus .READY_STOP ) {
381+ // retry task
382+ logger .info ("Retry taskInstance taskInstance state: {}" , taskInstance .getState ());
383+ retryTaskInstance (taskInstance );
384+ } else if (taskInstance .getState ().typeIsFailure ()) {
385+ completeTaskMap .put (taskInstance .getTaskCode (), taskInstance .getId ());
386+ // There are child nodes and the failure policy is: CONTINUE
387+ if (processInstance .getFailureStrategy () == FailureStrategy .CONTINUE && DagHelper .haveAllNodeAfterNode (
388+ Long .toString (taskInstance .getTaskCode ()),
389+ dag )) {
390+ submitPostNode (Long .toString (taskInstance .getTaskCode ()));
391+ } else {
392+ errorTaskMap .put (taskInstance .getTaskCode (), taskInstance .getId ());
393+ if (processInstance .getFailureStrategy () == FailureStrategy .END ) {
394+ killAllTasks ();
395+ }
396+ }
397+ } else if (taskInstance .getState ().typeIsFinished ()) {
398+ // todo: when the task instance type is pause, then it should not in completeTaskMap
399+ completeTaskMap .put (taskInstance .getTaskCode (), taskInstance .getId ());
393400 }
394- } else if (taskInstance .getState ().typeIsFinished ()) {
395- // todo: when the task instance type is pause, then it should not in completeTaskMap
396- completeTaskMap .put (taskInstance .getTaskCode (), taskInstance .getId ());
401+ logger .info ("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}" ,
402+ taskInstance .getTaskCode (),
403+ taskInstance .getState ());
404+ this .updateProcessInstanceState ();
405+ } catch (Exception ex ) {
406+ logger .error ("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap" , ex );
407+ // remove the task from complete map, so that we can finish in the next time.
408+ completeTaskMap .remove (taskInstance .getTaskCode ());
409+ throw ex ;
397410 }
398-
399- this .updateProcessInstanceState ();
400411 }
401412
402413 /**
@@ -651,14 +662,19 @@ public WorkflowSubmitStatue call() {
651662 LoggerUtils .setWorkflowInstanceIdMDC (processInstance .getId ());
652663 if (workflowRunnableStatus == WorkflowRunnableStatus .CREATED ) {
653664 buildFlowDag ();
665+ workflowRunnableStatus = WorkflowRunnableStatus .INITIALIZE_DAG ;
666+ logger .info ("workflowStatue changed to :{}" , workflowRunnableStatus );
654667 }
655668 if (workflowRunnableStatus == WorkflowRunnableStatus .INITIALIZE_DAG ) {
656669 initTaskQueue ();
670+ workflowRunnableStatus = WorkflowRunnableStatus .INITIALIZE_QUEUE ;
671+ logger .info ("workflowStatue changed to :{}" , workflowRunnableStatus );
657672 }
658673 if (workflowRunnableStatus == WorkflowRunnableStatus .INITIALIZE_QUEUE ) {
659674 submitPostNode (null );
675+ workflowRunnableStatus = WorkflowRunnableStatus .STARTED ;
676+ logger .info ("workflowStatue changed to :{}" , workflowRunnableStatus );
660677 }
661- workflowRunnableStatus = WorkflowRunnableStatus .STARTED ;
662678 return WorkflowSubmitStatue .SUCCESS ;
663679 } catch (Exception e ) {
664680 logger .error ("Start workflow error" , e );
@@ -751,7 +767,6 @@ private void buildFlowDag() throws Exception {
751767 }
752768 // generate process dag
753769 dag = DagHelper .buildDagGraph (processDag );
754- workflowRunnableStatus = WorkflowRunnableStatus .INITIALIZE_DAG ;
755770 logger .info ("Build dag success, dag: {}" , dag );
756771 }
757772
@@ -859,7 +874,6 @@ private void initTaskQueue() throws StateEventHandleException, CronParseExceptio
859874 }
860875 }
861876 }
862- workflowRunnableStatus = WorkflowRunnableStatus .INITIALIZE_QUEUE ;
863877 logger .info ("Initialize task queue, dependFailedTaskMap: {}, completeTaskMap: {}, errorTaskMap: {}" ,
864878 dependFailedTaskMap ,
865879 completeTaskMap ,
@@ -886,7 +900,11 @@ private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
886900
887901 boolean submit = taskProcessor .action (TaskAction .SUBMIT );
888902 if (!submit ) {
889- logger .error ("process id:{} name:{} submit standby task id:{} name:{} failed!" , processInstance .getId (), processInstance .getName (), taskInstance .getId (), taskInstance .getName ());
903+ logger .error ("process id:{} name:{} submit standby task id:{} name:{} failed!" ,
904+ processInstance .getId (),
905+ processInstance .getName (),
906+ taskInstance .getId (),
907+ taskInstance .getName ());
890908 return Optional .empty ();
891909 }
892910
@@ -937,7 +955,10 @@ private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
937955 }
938956 return Optional .of (taskInstance );
939957 } catch (Exception e ) {
940- logger .error ("submit standby task error" , e );
958+ logger .error ("submit standby task error, taskCode: {}, taskInstanceId: {}" ,
959+ taskInstance .getTaskCode (),
960+ taskInstance .getId (),
961+ e );
941962 return Optional .empty ();
942963 }
943964 }
@@ -1190,9 +1211,19 @@ private void setVarPoolValue(Map<String, Property> allProperty, Map<String, Task
11901211 */
11911212 private Map <String , TaskInstance > getCompleteTaskInstanceMap () {
11921213 Map <String , TaskInstance > completeTaskInstanceMap = new HashMap <>();
1193- for (Integer taskInstanceId : completeTaskMap .values ()) {
1214+ for (Map .Entry <Long , Integer > entry : completeTaskMap .entrySet ()) {
1215+ Long taskConde = entry .getKey ();
1216+ Integer taskInstanceId = entry .getValue ();
11941217 TaskInstance taskInstance = taskInstanceMap .get (taskInstanceId );
1218+ if (taskInstance == null ) {
1219+ logger .warn ("Cannot find the taskInstance from taskInstanceMap, taskInstanceId: {}, taskConde: {}" ,
1220+ taskInstanceId ,
1221+ taskConde );
1222+ // This case will happen when we submit to db failed, then the taskInstanceId is 0
1223+ continue ;
1224+ }
11951225 completeTaskInstanceMap .put (Long .toString (taskInstance .getTaskCode ()), taskInstance );
1226+
11961227 }
11971228 return completeTaskInstanceMap ;
11981229 }
@@ -1260,7 +1291,6 @@ private void submitPostNode(String parentNodeCode) throws StateEventHandleExcept
12601291 }
12611292 submitStandByTask ();
12621293 updateProcessInstanceState ();
1263- workflowRunnableStatus = WorkflowRunnableStatus .INITIALIZE_QUEUE ;
12641294 }
12651295
12661296 /**
@@ -1405,6 +1435,7 @@ private boolean hasFailedTask() {
14051435 */
14061436 private boolean processFailed () {
14071437 if (hasFailedTask ()) {
1438+ logger .info ("The current process has failed task, the current process failed" );
14081439 if (processInstance .getFailureStrategy () == FailureStrategy .END ) {
14091440 return true ;
14101441 }
@@ -1480,38 +1511,48 @@ private ExecutionStatus getProcessInstanceState(ProcessInstance instance) {
14801511
14811512 if (activeTaskProcessorMaps .size () > 0 || hasRetryTaskInStandBy ()) {
14821513 // active task and retry task exists
1483- return runningState (state );
1514+ ExecutionStatus executionStatus = runningState (state );
1515+ logger .info ("The workflowInstance has task running, the workflowInstance status is {}" , executionStatus );
1516+ return executionStatus ;
14841517 }
14851518
14861519 // block
14871520 if (state == ExecutionStatus .READY_BLOCK ) {
1488- return processReadyBlock ();
1521+ ExecutionStatus executionStatus = processReadyBlock ();
1522+ logger .info ("The workflowInstance is ready to block, the workflowInstance status is {}" , executionStatus );
14891523 }
14901524
14911525 // waiting thread
14921526 if (hasWaitingThreadTask ()) {
1527+ logger .info ("The workflowInstance has waiting thread task, the workflow status is {}" ,
1528+ ExecutionStatus .WAITING_THREAD );
14931529 return ExecutionStatus .WAITING_THREAD ;
14941530 }
14951531
14961532 // pause
14971533 if (state == ExecutionStatus .READY_PAUSE ) {
1498- return processReadyPause ();
1534+ ExecutionStatus executionStatus = processReadyPause ();
1535+ logger .info ("The workflowInstance is ready to pause, the workflow status is {}" , executionStatus );
1536+ return executionStatus ;
14991537 }
15001538
15011539 // stop
15021540 if (state == ExecutionStatus .READY_STOP ) {
15031541 List <TaskInstance > stopList = getCompleteTaskByState (ExecutionStatus .STOP );
15041542 List <TaskInstance > killList = getCompleteTaskByState (ExecutionStatus .KILL );
15051543 List <TaskInstance > failList = getCompleteTaskByState (ExecutionStatus .FAILURE );
1544+ ExecutionStatus executionStatus ;
15061545 if (CollectionUtils .isNotEmpty (stopList ) || CollectionUtils .isNotEmpty (killList ) || CollectionUtils .isNotEmpty (failList ) || !isComplementEnd ()) {
1507- return ExecutionStatus .STOP ;
1546+ executionStatus = ExecutionStatus .STOP ;
15081547 } else {
1509- return ExecutionStatus .SUCCESS ;
1548+ executionStatus = ExecutionStatus .SUCCESS ;
15101549 }
1550+ logger .info ("The workflowInstance is ready to stop, the workflow status is {}" , executionStatus );
15111551 }
15121552
15131553 // process failure
15141554 if (processFailed ()) {
1555+ logger .info ("The workflowInstance is failed, the workflow status is {}" , ExecutionStatus .FAILURE );
15151556 return ExecutionStatus .FAILURE ;
15161557 }
15171558
@@ -1555,15 +1596,21 @@ private boolean isComplementEnd() {
15551596 private void updateProcessInstanceState () throws StateEventHandleException {
15561597 ExecutionStatus state = getProcessInstanceState (processInstance );
15571598 if (processInstance .getState () != state ) {
1599+ logger .info ("Update workflowInstance states, origin state: {}, target state: {}" ,
1600+ processInstance .getState (),
1601+ state );
15581602 updateWorkflowInstanceStatesToDB (state );
15591603
15601604 StateEvent stateEvent = new StateEvent ();
15611605 stateEvent .setExecutionStatus (processInstance .getState ());
15621606 stateEvent .setProcessInstanceId (this .processInstance .getId ());
15631607 stateEvent .setType (StateEventType .PROCESS_STATE_CHANGE );
1564- // this.processStateChangeHandler(stateEvent);
15651608 // replace with `stateEvents`, make sure `WorkflowExecuteThread` can be deleted to avoid memory leaks
15661609 this .stateEvents .add (stateEvent );
1610+ } else {
1611+ logger .info ("There is no need to update the workflow instance state, origin state: {}, target state: {}" ,
1612+ processInstance .getState (),
1613+ state );
15671614 }
15681615 }
15691616
@@ -1578,12 +1625,9 @@ public void updateProcessInstanceState(StateEvent stateEvent) throws StateEventH
15781625 private void updateWorkflowInstanceStatesToDB (ExecutionStatus newStates ) throws StateEventHandleException {
15791626 ExecutionStatus originStates = processInstance .getState ();
15801627 if (originStates != newStates ) {
1581- logger .info ("work flow process instance [id: {}, name:{}], state change from {} to {}, cmd type: {}" ,
1582- processInstance .getId (),
1583- processInstance .getName (),
1584- originStates ,
1585- newStates ,
1586- processInstance .getCommandType ());
1628+ logger .info ("Begin to update workflow instance state , state will change from {} to {}" ,
1629+ originStates ,
1630+ newStates );
15871631
15881632 processInstance .setState (newStates );
15891633 if (newStates .typeIsFinished ()) {
@@ -1633,8 +1677,8 @@ public void addTaskToStandByList(TaskInstance taskInstance) {
16331677 *
16341678 * @param taskInstance task instance
16351679 */
1636- private void removeTaskFromStandbyList (TaskInstance taskInstance ) {
1637- readyToSubmitTaskQueue .remove (taskInstance );
1680+ private boolean removeTaskFromStandbyList (TaskInstance taskInstance ) {
1681+ return readyToSubmitTaskQueue .remove (taskInstance );
16381682 }
16391683
16401684 /**
@@ -1724,10 +1768,20 @@ public void submitStandByTask() throws StateEventHandleException {
17241768 if (!taskInstanceOptional .isPresent ()) {
17251769 this .taskFailedSubmit = true ;
17261770 // Remove and add to complete map and error map
1727- removeTaskFromStandbyList (task );
1771+ if (!removeTaskFromStandbyList (task )) {
1772+ logger .error (
1773+ "Task submit failed, remove from standby list failed, workflowInstanceId: {}, taskCode: {}" ,
1774+ processInstance .getId (),
1775+ task .getTaskCode ());
1776+ }
17281777 completeTaskMap .put (task .getTaskCode (), task .getId ());
1778+ taskInstanceMap .put (task .getId (), task );
17291779 errorTaskMap .put (task .getTaskCode (), task .getId ());
1730- logger .error ("Task submitted failed, processInstanceId: {}, taskInstanceId: {}" , task .getProcessInstanceId (), task .getId ());
1780+ activeTaskProcessorMaps .remove (task .getTaskCode ());
1781+ logger .error ("Task submitted failed, workflowInstanceId: {}, taskInstanceId: {}, taskCode: {}" ,
1782+ task .getProcessInstanceId (),
1783+ task .getId (),
1784+ task .getTaskCode ());
17311785 } else {
17321786 removeTaskFromStandbyList (task );
17331787 }
0 commit comments