[Fix-9401]Master service dispatch retry log optimization.#9955
[Fix-9401]Master service dispatch retry log optimization.#9955WangJPLeo wants to merge 7 commits intoapache:devfrom
Conversation
Codecov Report
@@ Coverage Diff @@
## dev #9955 +/- ##
============================================
+ Coverage 40.68% 40.73% +0.04%
- Complexity 4579 4589 +10
============================================
Files 834 834
Lines 33832 33879 +47
Branches 3745 3754 +9
============================================
+ Hits 13765 13800 +35
- Misses 18747 18755 +8
- Partials 1320 1324 +4
Continue to review full report at Codecov.
|
| iTaskProcessor.action(TaskAction.RUN); | ||
|
|
||
| // pending task needs to be dispatched | ||
| if (task.getState().typeIsPending()){ |
There was a problem hiding this comment.
hello, in dispatchFailedTaskInstanceState2Pending() method, you just save the pending status to db but not in WorkflowExecuteThread.taskInstanceMap, i am not sure if the task here that get from taskInstanceMap can be pending, or the taskInstanceMap may update taskInstance other place ?
There was a problem hiding this comment.
Thanks, the consideration here is that the task instance in the Pending state needs to be dispatched when a failover occurs, I reconfirmed the test and found that this is not the case. But the interesting thing is that a historical problem was discovered during the period, the selected task instance was recreated when the Master service failed over. The executed task instance is re-created, causing the execution of the new task instance to resume successfully, The old task instance will not change anything and generate garbage data. This problem will be written in Issues and will be resolved in the next PR. Finally thank you again for your review.
|
@WangJPLeo need to sync the doc too, see https://dolphinscheduler.apache.org/en-us/docs/dev/user_doc/architecture/design.html |
| if (dispatchFailedTask.getDispatchFailedRetryTimes() >= Constants.DEFAULT_MAX_RETRY_COUNT){ | ||
| logger.error("the number of retries for dispatch failure has exceeded the maximum limit, taskId: {} processInstanceId: {}", dispatchFailedTask.getTaskId(), dispatchFailedTask.getProcessInstanceId()); | ||
| // business alarm | ||
| continue; |
There was a problem hiding this comment.
These tasks need to continue to be submitted.
There was a problem hiding this comment.
Yes, Thanks. After retries 100 times, the business alarm, and then continue to retry according to each 100s delay.
|
|
Kudos, SonarCloud Quality Gate passed! |
|
FYI @WangJPLeo @caishunfeng @lenboo I change this PR's milestone from 300b1 to 300b2 |
ruanwenjun
left a comment
There was a problem hiding this comment.
In fact, I don't think we need to use a dispatch failed queue and set the maxDispatchSize for a task.
Dispatch failed due to the worker network error, this should have nothing to do with the task. So the correct thing is to find the dispatch failed worker, and separate it rather than separate the task.
And I remember in the current design when a task dispatch failed, it will go back to the task queue, and retry-retry, I think this is reasonable.
One possibly thing I think we may need to do is separate the task state Dispatch to Dispatching and Dispatched.
| // service alarm when retries 100 times | ||
| if (dispatchFailedTask.getDispatchFailedRetryTimes() == Constants.DEFAULT_MAX_RETRY_COUNT){ | ||
| logger.error("the number of retries for dispatch failure has exceeded the maximum limit, taskId: {} processInstanceId: {}", dispatchFailedTask.getTaskId(), dispatchFailedTask.getProcessInstanceId()); | ||
| // business alarm | ||
| } | ||
| // differentiate the queue to prevent high priority from affecting the execution of other tasks | ||
| taskPriorityDispatchFailedQueue.put(dispatchFailedTask); |
There was a problem hiding this comment.
So if one task exceeds the max retry times, will it still retry in your plan? I guess you set the maxRetryTimes is want to stop retry?
There was a problem hiding this comment.
Instead of stopping the retry, the retry interval is increased according to the number of retries. When the maximum number of retries is reached, each time interval is 100s. This is described in the comment on line 222 (retry more than 100 times with 100 seconds delay each time).
| TaskPriority dispatchFailedTaskPriority = taskPriorityDispatchFailedQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); | ||
| if (Objects.isNull(dispatchFailedTaskPriority)){ | ||
| continue; | ||
| } | ||
| if (canRetry(dispatchFailedTaskPriority)){ | ||
| dispatchFailedTaskPriority.setDispatchFailedRetryTimes(dispatchFailedTaskPriority.getDispatchFailedRetryTimes() + 1); | ||
| taskPriorityQueue.put(dispatchFailedTaskPriority); | ||
| } else { | ||
| taskPriorityDispatchFailedQueue.put(dispatchFailedTaskPriority); | ||
| } | ||
| } |
There was a problem hiding this comment.
It's not a good idea to deal with the dispatchFailedQueue in the normal process.
Most of the time, the failedDispatchQueue is empty, so if you use the default dispatchTaskNum -> 3, you will extra wait for 3s in each dispatch, since you need to poll the failedDispatchQueue.
There was a problem hiding this comment.
This situation may occur, and it is determined whether to traverse by judging the size of the Queue that fails to dispatch.
|
If you want to reduce the dispatchFailed log, I have fixed the sleep logic in #10631. If there is no worker, the master will not dead loop now. |
thank you @ruanwenjun , the Pending state of the workflow instance and the dispatch failure queue are added to solve three problems:
based on these three points, I think your question can be explained and is consistent with the present. |
When we dispatch failure, the reason is that there is no worker/worker's network is broken, the correct thing is we don't consume command, although use failure queue can mitigating such problem, but this is the same solution to the current plan, put it back to the normal queue, and sleep. When you put it to failure queue, you need to use another thread to handle it, otherwise, you may influence the normal process.
|
|
I think it's necessary to use another thread to address the risk handling of the failed queue and the 3 second delay. but the biggest difference between the failure queue and the current solution is, To the greatest extent, when tasks with higher priority fail, the highest priority task will not be dispatched every time, and we can do some other expansion processing for these failed tasks relying on the failure queue, like monitor and more. |
Ok, use a single thread to consume the failure queue looks good to me, and since you add a new status, you also need to add this to NEED_FAILOVER_STATES, and I am not sure if there is other place need to update. In the current code, when we add new status, it is easy to bring new bug. |
|
ok thx for your suggestion, I will optimize the processing logic according to the discussion results and check where the status is used. |
|
Will close this pr due to no update for a long time. |








Purpose of the pull request
Optimize output logging; distribution failures are visible to users.
Brief change log
After the distribution fails, ExecuteException is no longer thrown, and the distribution result is returned;
If the distribution result fails, change the current task instance to PENDING state, and record the current timestamp as the latest distribution failure time;
If the distribution fails, put it into the distribution failure queue [isolated from the original queue] and wait for it to be taken out and distributed again;
The retry timing is based on the latest dispatch failure time and the current interval time is greater than or equal to the specified time and increases the number of failed retries;
The server is down: the failover server picks up the workflow instance in the specified state -> create command RECOVER TOLERANCE FAULT PROCESS
close #9401
Verify this pull request
Manually verified the change by testing locally.