|
17 | 17 |
|
18 | 18 | package org.apache.dolphinscheduler.server.master.runner.task; |
19 | 19 |
|
20 | | -import static org.apache.dolphinscheduler.common.Constants.ADDRESS; |
21 | | -import static org.apache.dolphinscheduler.common.Constants.DATABASE; |
22 | | -import static org.apache.dolphinscheduler.common.Constants.JDBC_URL; |
23 | | -import static org.apache.dolphinscheduler.common.Constants.OTHER; |
24 | | -import static org.apache.dolphinscheduler.common.Constants.PASSWORD; |
25 | | -import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; |
26 | | -import static org.apache.dolphinscheduler.common.Constants.USER; |
27 | | -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER; |
28 | | -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY; |
29 | | -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_K8S; |
30 | | -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME; |
31 | | -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE; |
32 | | -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE; |
33 | | -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_CONNECTOR_TYPE; |
34 | | -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_DATASOURCE_ID; |
35 | | -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE; |
36 | | -import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID; |
37 | | - |
| 20 | +import com.zaxxer.hikari.HikariDataSource; |
| 21 | +import lombok.NonNull; |
| 22 | +import org.apache.commons.collections.CollectionUtils; |
38 | 23 | import org.apache.dolphinscheduler.common.Constants; |
39 | 24 | import org.apache.dolphinscheduler.common.utils.HadoopUtils; |
40 | 25 | import org.apache.dolphinscheduler.common.utils.JSONUtils; |
|
80 | 65 | import org.apache.dolphinscheduler.spi.enums.DbType; |
81 | 66 | import org.apache.dolphinscheduler.spi.enums.ResourceType; |
82 | 67 | import org.apache.dolphinscheduler.spi.utils.StringUtils; |
83 | | - |
84 | | -import org.apache.commons.collections.CollectionUtils; |
| 68 | +import org.slf4j.Logger; |
| 69 | +import org.slf4j.LoggerFactory; |
85 | 70 |
|
86 | 71 | import java.util.ArrayList; |
87 | 72 | import java.util.HashMap; |
|
93 | 78 | import java.util.stream.Collectors; |
94 | 79 | import java.util.stream.Stream; |
95 | 80 |
|
96 | | -import org.slf4j.Logger; |
97 | | -import org.slf4j.LoggerFactory; |
98 | | - |
99 | | -import com.zaxxer.hikari.HikariDataSource; |
100 | | - |
101 | | -import lombok.NonNull; |
| 81 | +import static org.apache.dolphinscheduler.common.Constants.ADDRESS; |
| 82 | +import static org.apache.dolphinscheduler.common.Constants.DATABASE; |
| 83 | +import static org.apache.dolphinscheduler.common.Constants.JDBC_URL; |
| 84 | +import static org.apache.dolphinscheduler.common.Constants.OTHER; |
| 85 | +import static org.apache.dolphinscheduler.common.Constants.PASSWORD; |
| 86 | +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; |
| 87 | +import static org.apache.dolphinscheduler.common.Constants.USER; |
| 88 | +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CLUSTER; |
| 89 | +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DATA_QUALITY; |
| 90 | +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_K8S; |
| 91 | +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_NAME; |
| 92 | +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TABLE; |
| 93 | +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.COMPARISON_TYPE; |
| 94 | +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_CONNECTOR_TYPE; |
| 95 | +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.SRC_DATASOURCE_ID; |
| 96 | +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE; |
| 97 | +import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID; |
102 | 98 |
|
103 | 99 | public abstract class BaseTaskProcessor implements ITaskProcessor { |
104 | 100 |
|
@@ -185,27 +181,39 @@ public boolean action(TaskAction taskAction) { |
185 | 181 | if (StringUtils.isNotEmpty(threadLoggerInfoName)) { |
186 | 182 | Thread.currentThread().setName(threadLoggerInfoName); |
187 | 183 | } |
188 | | - switch (taskAction) { |
189 | | - case STOP: |
190 | | - return stop(); |
191 | | - case PAUSE: |
192 | | - return pause(); |
193 | | - case TIMEOUT: |
194 | | - return timeout(); |
195 | | - case SUBMIT: |
196 | | - return submit(); |
197 | | - case RUN: |
198 | | - return run(); |
199 | | - case DISPATCH: |
200 | | - return dispatch(); |
201 | | - case RESUBMIT: |
202 | | - return resubmit(); |
203 | | - default: |
204 | | - logger.error("unknown task action: {}", taskAction); |
| 184 | + boolean result = false; |
| 185 | + try { |
| 186 | + switch (taskAction) { |
| 187 | + case STOP: |
| 188 | + result = stop(); |
| 189 | + break; |
| 190 | + case PAUSE: |
| 191 | + result = pause(); |
| 192 | + break; |
| 193 | + case TIMEOUT: |
| 194 | + result = timeout(); |
| 195 | + break; |
| 196 | + case SUBMIT: |
| 197 | + result = submit(); |
| 198 | + break; |
| 199 | + case RUN: |
| 200 | + result = run(); |
| 201 | + break; |
| 202 | + case DISPATCH: |
| 203 | + result = dispatch(); |
| 204 | + break; |
| 205 | + case RESUBMIT: |
| 206 | + result = resubmit(); |
| 207 | + break; |
| 208 | + default: |
| 209 | + logger.error("unknown task action: {}", taskAction); |
| 210 | + } |
| 211 | + return result; |
| 212 | + } finally { |
| 213 | + // reset thread name |
| 214 | + Thread.currentThread().setName(threadName); |
| 215 | + |
205 | 216 | } |
206 | | - // reset thread name |
207 | | - Thread.currentThread().setName(threadName); |
208 | | - return false; |
209 | 217 | } |
210 | 218 |
|
211 | 219 | protected boolean resubmit() { |
|
0 commit comments