Skip to content

Commit e6239e8

Browse files
caishunfengcaishunfeng
andauthored
[DS-6829][WorkerServer] skip create log dir and print log in dryRun model (#6852)
Co-authored-by: caishunfeng <534328519@qq.com>
1 parent 1b6b526 commit e6239e8

3 files changed

Lines changed: 35 additions & 31 deletions

File tree

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,4 +766,5 @@ private Constants() {
766766
* dry run flag
767767
*/
768768
public static final int DRY_RUN_FLAG_NO = 0;
769+
public static final int DRY_RUN_FLAG_YES = 1;
769770
}

dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.server.worker.processor;
1919

20+
import org.apache.dolphinscheduler.common.Constants;
2021
import org.apache.dolphinscheduler.common.enums.Event;
2122
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
2223
import org.apache.dolphinscheduler.common.enums.TaskType;
@@ -135,19 +136,21 @@ public void process(Channel channel, Command command) {
135136
taskExecutionContext.setHost(NetUtils.getAddr(workerConfig.getListenPort()));
136137
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
137138

138-
// local execute path
139-
String execLocalPath = getExecLocalPath(taskExecutionContext);
140-
logger.info("task instance local execute path : {}", execLocalPath);
141-
taskExecutionContext.setExecutePath(execLocalPath);
142-
143-
try {
144-
FileUtils.createWorkDirIfAbsent(execLocalPath);
145-
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
146-
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
139+
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
140+
// local execute path
141+
String execLocalPath = getExecLocalPath(taskExecutionContext);
142+
logger.info("task instance local execute path : {}", execLocalPath);
143+
taskExecutionContext.setExecutePath(execLocalPath);
144+
145+
try {
146+
FileUtils.createWorkDirIfAbsent(execLocalPath);
147+
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
148+
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
149+
}
150+
} catch (Throwable ex) {
151+
logger.error("create execLocalPath: {}", execLocalPath, ex);
152+
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
147153
}
148-
} catch (Throwable ex) {
149-
logger.error("create execLocalPath: {}", execLocalPath, ex);
150-
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
151154
}
152155

153156
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),

dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,16 @@ public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
125125

126126
@Override
127127
public void run() {
128-
129128
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
129+
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
130+
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
131+
responseCommand.setEndTime(new Date());
132+
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
133+
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
134+
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
135+
return;
136+
}
137+
130138
try {
131139
logger.info("script path : {}", taskExecutionContext.getExecutePath());
132140
// check if the OS user exists
@@ -146,13 +154,8 @@ public void run() {
146154
}
147155
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
148156

149-
int dryRun = taskExecutionContext.getDryRun();
150157
// copy hdfs/minio file to local
151-
if (dryRun == Constants.DRY_RUN_FLAG_NO) {
152-
downloadResource(taskExecutionContext.getExecutePath(),
153-
taskExecutionContext.getResources(),
154-
logger);
155-
}
158+
downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger);
156159

157160
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
158161
taskExecutionContext.setDefinedParams(getGlobalParamsMap());
@@ -177,31 +180,28 @@ public void run() {
177180
taskRequest.setTaskLogName(taskLogName);
178181

179182
task = taskChannel.createTask(taskRequest);
183+
180184
// task init
181185
this.task.init();
186+
182187
//init varPool
183188
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
184189

185-
if (dryRun == Constants.DRY_RUN_FLAG_NO) {
186-
// task handle
187-
this.task.handle();
190+
// task handle
191+
this.task.handle();
188192

189-
// task result process
190-
if (this.task.getNeedAlert()) {
191-
sendAlert(this.task.getTaskAlertInfo());
192-
}
193-
responseCommand.setStatus(this.task.getExitStatus().getCode());
194-
} else {
195-
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
196-
task.setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
193+
// task result process
194+
if (this.task.getNeedAlert()) {
195+
sendAlert(this.task.getTaskAlertInfo());
197196
}
197+
198+
responseCommand.setStatus(this.task.getExitStatus().getCode());
198199
responseCommand.setEndTime(new Date());
199200
responseCommand.setProcessId(this.task.getProcessId());
200201
responseCommand.setAppIds(this.task.getAppIds());
201202
responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
202203
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
203204
} catch (Throwable e) {
204-
205205
logger.error("task scheduler failure", e);
206206
kill();
207207
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());

0 commit comments

Comments
 (0)