Skip to content

Commit e6fe39e

Browse files
caishunfengcaishunfeng
andauthored
[DS-6961][MasterServer] batch dispatch (#6962)
* [DS-6961][MasterServer] batch dispatch * fix test Co-authored-by: caishunfeng <534328519@qq.com>
1 parent 5a04b8d commit e6fe39e

4 files changed

Lines changed: 42 additions & 28 deletions

File tree

dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ public class MasterConfig {
3131
private int fetchCommandNum;
3232
private int preExecThreads;
3333
private int execThreads;
34-
private int execTaskNum;
3534
private int dispatchTaskNumber;
3635
private HostSelector hostSelector;
3736
private int heartbeatInterval;
@@ -74,14 +73,6 @@ public void setExecThreads(int execThreads) {
7473
this.execThreads = execThreads;
7574
}
7675

77-
public int getExecTaskNum() {
78-
return execTaskNum;
79-
}
80-
81-
public void setExecTaskNum(int execTaskNum) {
82-
this.execTaskNum = execTaskNum;
83-
}
84-
8576
public int getDispatchTaskNumber() {
8677
return dispatchTaskNumber;
8778
}

dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919

2020
import org.apache.dolphinscheduler.common.Constants;
2121
import org.apache.dolphinscheduler.common.thread.Stopper;
22+
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
2223
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2324
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
2425
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
2526
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
2627
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
2728
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
29+
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
2830
import org.apache.dolphinscheduler.service.process.ProcessService;
2931
import org.apache.dolphinscheduler.service.queue.TaskPriority;
3032
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
@@ -33,6 +35,8 @@
3335
import java.util.ArrayList;
3436
import java.util.List;
3537
import java.util.Objects;
38+
import java.util.concurrent.CountDownLatch;
39+
import java.util.concurrent.ThreadPoolExecutor;
3640
import java.util.concurrent.TimeUnit;
3741

3842
import javax.annotation.PostConstruct;
@@ -78,30 +82,24 @@ public class TaskPriorityQueueConsumer extends Thread {
7882
@Autowired
7983
private MasterConfig masterConfig;
8084

85+
/**
86+
* consumer thread pool
87+
*/
88+
private ThreadPoolExecutor consumerThreadPoolExecutor;
89+
8190
@PostConstruct
8291
public void init() {
83-
super.setName("TaskUpdateQueueConsumerThread");
92+
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
8493
super.start();
8594
}
8695

8796
@Override
8897
public void run() {
89-
List<TaskPriority> failedDispatchTasks = new ArrayList<>();
98+
int fetchTaskNum = masterConfig.getDispatchTaskNumber();
9099
while (Stopper.isRunning()) {
91100
try {
92-
int fetchTaskNum = masterConfig.getDispatchTaskNumber();
93-
failedDispatchTasks.clear();
94-
for (int i = 0; i < fetchTaskNum; i++) {
95-
TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
96-
if (Objects.isNull(taskPriority)) {
97-
continue;
98-
}
101+
List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
99102

100-
boolean dispatchResult = dispatch(taskPriority);
101-
if (!dispatchResult) {
102-
failedDispatchTasks.add(taskPriority);
103-
}
104-
}
105103
if (!failedDispatchTasks.isEmpty()) {
106104
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
107105
taskPriorityQueue.put(dispatchFailedTask);
@@ -118,13 +116,41 @@ public void run() {
118116
}
119117
}
120118

119+
/**
120+
* batch dispatch with thread pool
121+
*/
122+
private List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
123+
List<TaskPriority> failedDispatchTasks = new ArrayList<>();
124+
CountDownLatch latch = new CountDownLatch(fetchTaskNum);
125+
126+
for (int i = 0; i < fetchTaskNum; i++) {
127+
TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
128+
if (Objects.isNull(taskPriority)) {
129+
latch.countDown();
130+
continue;
131+
}
132+
133+
consumerThreadPoolExecutor.submit(() -> {
134+
boolean dispatchResult = this.dispatchTask(taskPriority);
135+
if (!dispatchResult) {
136+
failedDispatchTasks.add(taskPriority);
137+
}
138+
latch.countDown();
139+
});
140+
}
141+
142+
latch.await();
143+
144+
return failedDispatchTasks;
145+
}
146+
121147
/**
122148
* dispatch task
123149
*
124150
* @param taskPriority taskPriority
125151
* @return result
126152
*/
127-
protected boolean dispatch(TaskPriority taskPriority) {
153+
protected boolean dispatchTask(TaskPriority taskPriority) {
128154
boolean result = false;
129155
try {
130156
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
@@ -158,8 +184,6 @@ public Boolean taskInstanceIsFinalState(int taskInstanceId) {
158184

159185
/**
160186
* check if task need to check state, if true, refresh the checkpoint
161-
* @param taskPriority
162-
* @return
163187
*/
164188
private boolean isTaskNeedToCheck(TaskPriority taskPriority) {
165189
long now = System.currentTimeMillis();

dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ public void init() throws Exception {
8888

8989
applicationContext = mock(ApplicationContext.class);
9090
config = new MasterConfig();
91-
config.setExecTaskNum(1);
9291
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
9392

9493
processInstance = mock(ProcessInstance.class);

dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ public void testDispatch() {
307307

308308
TaskPriority taskPriority = new TaskPriority();
309309
taskPriority.setTaskId(1);
310-
boolean res = taskPriorityQueueConsumer.dispatch(taskPriority);
310+
boolean res = taskPriorityQueueConsumer.dispatchTask(taskPriority);
311311

312312
Assert.assertFalse(res);
313313
}

0 commit comments

Comments
 (0)