1919
2020import org .apache .dolphinscheduler .common .Constants ;
2121import org .apache .dolphinscheduler .common .thread .Stopper ;
22+ import org .apache .dolphinscheduler .common .thread .ThreadUtils ;
2223import org .apache .dolphinscheduler .dao .entity .TaskInstance ;
2324import org .apache .dolphinscheduler .server .master .config .MasterConfig ;
2425import org .apache .dolphinscheduler .server .master .dispatch .ExecutorDispatcher ;
2526import org .apache .dolphinscheduler .server .master .dispatch .context .ExecutionContext ;
2627import org .apache .dolphinscheduler .server .master .dispatch .enums .ExecutorType ;
2728import org .apache .dolphinscheduler .server .master .dispatch .exceptions .ExecuteException ;
29+ import org .apache .dolphinscheduler .service .exceptions .TaskPriorityQueueException ;
2830import org .apache .dolphinscheduler .service .process .ProcessService ;
2931import org .apache .dolphinscheduler .service .queue .TaskPriority ;
3032import org .apache .dolphinscheduler .service .queue .TaskPriorityQueue ;
3335import java .util .ArrayList ;
3436import java .util .List ;
3537import java .util .Objects ;
38+ import java .util .concurrent .CountDownLatch ;
39+ import java .util .concurrent .ThreadPoolExecutor ;
3640import java .util .concurrent .TimeUnit ;
3741
3842import javax .annotation .PostConstruct ;
@@ -78,30 +82,24 @@ public class TaskPriorityQueueConsumer extends Thread {
7882 @ Autowired
7983 private MasterConfig masterConfig ;
8084
85+ /**
86+ * consumer thread pool
87+ */
88+ private ThreadPoolExecutor consumerThreadPoolExecutor ;
89+
8190 @ PostConstruct
8291 public void init () {
83- super . setName ( "TaskUpdateQueueConsumerThread" );
92+ this . consumerThreadPoolExecutor = ( ThreadPoolExecutor ) ThreadUtils . newDaemonFixedThreadExecutor ( "TaskUpdateQueueConsumerThread" , masterConfig . getDispatchTaskNumber () );
8493 super .start ();
8594 }
8695
8796 @ Override
8897 public void run () {
89- List < TaskPriority > failedDispatchTasks = new ArrayList <> ();
98+ int fetchTaskNum = masterConfig . getDispatchTaskNumber ();
9099 while (Stopper .isRunning ()) {
91100 try {
92- int fetchTaskNum = masterConfig .getDispatchTaskNumber ();
93- failedDispatchTasks .clear ();
94- for (int i = 0 ; i < fetchTaskNum ; i ++) {
95- TaskPriority taskPriority = taskPriorityQueue .poll (Constants .SLEEP_TIME_MILLIS , TimeUnit .MILLISECONDS );
96- if (Objects .isNull (taskPriority )) {
97- continue ;
98- }
101+ List <TaskPriority > failedDispatchTasks = this .batchDispatch (fetchTaskNum );
99102
100- boolean dispatchResult = dispatch (taskPriority );
101- if (!dispatchResult ) {
102- failedDispatchTasks .add (taskPriority );
103- }
104- }
105103 if (!failedDispatchTasks .isEmpty ()) {
106104 for (TaskPriority dispatchFailedTask : failedDispatchTasks ) {
107105 taskPriorityQueue .put (dispatchFailedTask );
@@ -118,13 +116,41 @@ public void run() {
118116 }
119117 }
120118
119+ /**
120+ * batch dispatch with thread pool
121+ */
122+ private List <TaskPriority > batchDispatch (int fetchTaskNum ) throws TaskPriorityQueueException , InterruptedException {
123+ List <TaskPriority > failedDispatchTasks = new ArrayList <>();
124+ CountDownLatch latch = new CountDownLatch (fetchTaskNum );
125+
126+ for (int i = 0 ; i < fetchTaskNum ; i ++) {
127+ TaskPriority taskPriority = taskPriorityQueue .poll (Constants .SLEEP_TIME_MILLIS , TimeUnit .MILLISECONDS );
128+ if (Objects .isNull (taskPriority )) {
129+ latch .countDown ();
130+ continue ;
131+ }
132+
133+ consumerThreadPoolExecutor .submit (() -> {
134+ boolean dispatchResult = this .dispatchTask (taskPriority );
135+ if (!dispatchResult ) {
136+ failedDispatchTasks .add (taskPriority );
137+ }
138+ latch .countDown ();
139+ });
140+ }
141+
142+ latch .await ();
143+
144+ return failedDispatchTasks ;
145+ }
146+
121147 /**
122148 * dispatch task
123149 *
124150 * @param taskPriority taskPriority
125151 * @return result
126152 */
127- protected boolean dispatch (TaskPriority taskPriority ) {
153+ protected boolean dispatchTask (TaskPriority taskPriority ) {
128154 boolean result = false ;
129155 try {
130156 TaskExecutionContext context = taskPriority .getTaskExecutionContext ();
@@ -158,8 +184,6 @@ public Boolean taskInstanceIsFinalState(int taskInstanceId) {
158184
159185 /**
160186 * check if task need to check state, if true, refresh the checkpoint
161- * @param taskPriority
162- * @return
163187 */
164188 private boolean isTaskNeedToCheck (TaskPriority taskPriority ) {
165189 long now = System .currentTimeMillis ();
0 commit comments