Skip to content

Commit 4057d32

Browse files
committed
revert message to command
1 parent ed31f5c commit 4057d32

28 files changed

Lines changed: 96 additions & 96 deletions

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2525
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
2626
import org.apache.dolphinscheduler.remote.command.Command;
27-
import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage;
27+
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
2828
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
2929
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
3030
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
@@ -242,7 +242,7 @@ private void addDispatchEvent(TaskExecutionContext context, ExecutionContext exe
242242

243243
private Command toCommand(TaskExecutionContext taskExecutionContext) {
244244
// todo: we didn't set the host here, since right now we didn't need to retry this message.
245-
TaskDispatchMessage requestCommand = new TaskDispatchMessage(taskExecutionContext,
245+
TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
246246
masterConfig.getMasterAddress(),
247247
taskExecutionContext.getHost(),
248248
System.currentTimeMillis());

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.dolphinscheduler.common.enums.TaskEventType;
2121
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2222
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
23-
import org.apache.dolphinscheduler.remote.command.TaskRejectAckMessage;
23+
import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
2424
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
2525
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
2626
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
@@ -69,7 +69,7 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
6969
}
7070

7171
public void sendAckToWorker(TaskEvent taskEvent) {
72-
TaskRejectAckMessage taskRejectAckMessage = new TaskRejectAckMessage(ExecutionStatus.SUCCESS.getCode(),
72+
TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(),
7373
taskEvent.getTaskInstanceId(),
7474
masterConfig.getMasterAddress(),
7575
taskEvent.getWorkerAddress(),

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskResultEventHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2323
import org.apache.dolphinscheduler.dao.utils.TaskInstanceUtils;
2424
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
25-
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckMessage;
25+
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
2626
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
2727
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
2828
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
@@ -110,7 +110,7 @@ public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError, Ta
110110

111111
public void sendAckToWorker(TaskEvent taskEvent) {
112112
// we didn't set the receiver address, since the ack doen's need to retry
113-
TaskExecuteAckMessage taskExecuteAckMessage = new TaskExecuteAckMessage(ExecutionStatus.SUCCESS.getCode(),
113+
TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(ExecutionStatus.SUCCESS.getCode(),
114114
taskEvent.getTaskInstanceId(),
115115
masterConfig.getMasterAddress(),
116116
taskEvent.getWorkerAddress(),

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteResponseProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
2222
import org.apache.dolphinscheduler.remote.command.Command;
2323
import org.apache.dolphinscheduler.remote.command.CommandType;
24-
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage;
24+
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
2525
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
2626
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
2727
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@@ -58,8 +58,8 @@ public void process(Channel channel, Command command) {
5858
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESULT == command.getType(),
5959
String.format("invalid command type : %s", command.getType()));
6060

61-
TaskExecuteResultMessage taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
62-
TaskExecuteResultMessage.class);
61+
TaskExecuteResultCommand taskExecuteResultMessage = JSONUtils.parseObject(command.getBody(),
62+
TaskExecuteResultCommand.class);
6363
TaskEvent taskResultEvent = TaskEvent.newResultEvent(taskExecuteResultMessage, channel);
6464
try {
6565
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskResultEvent.getProcessInstanceId(),

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2121
import org.apache.dolphinscheduler.remote.command.Command;
2222
import org.apache.dolphinscheduler.remote.command.CommandType;
23-
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage;
23+
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
2424
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
2525
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
2626
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@@ -54,7 +54,7 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
5454
@Override
5555
public void process(Channel channel, Command command) {
5656
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
57-
TaskExecuteRunningMessage taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningMessage.class);
57+
TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
5858
logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningMessage);
5959

6060
TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningMessage, channel);

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskRecallProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
2222
import org.apache.dolphinscheduler.remote.command.Command;
2323
import org.apache.dolphinscheduler.remote.command.CommandType;
24-
import org.apache.dolphinscheduler.remote.command.TaskRejectMessage;
24+
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
2525
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
2626
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
2727
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
@@ -55,7 +55,7 @@ public class TaskRecallProcessor implements NettyRequestProcessor {
5555
@Override
5656
public void process(Channel channel, Command command) {
5757
Preconditions.checkArgument(CommandType.TASK_REJECT == command.getType(), String.format("invalid command type : %s", command.getType()));
58-
TaskRejectMessage recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectMessage.class);
58+
TaskRejectCommand recallCommand = JSONUtils.parseObject(command.getBody(), TaskRejectCommand.class);
5959
TaskEvent taskEvent = TaskEvent.newRecallEvent(recallCommand, channel);
6060
try {
6161
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(recallCommand.getProcessInstanceId(), recallCommand.getTaskInstanceId());

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskEvent.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919

2020
import org.apache.dolphinscheduler.common.enums.TaskEventType;
2121
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
22-
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultMessage;
23-
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage;
24-
import org.apache.dolphinscheduler.remote.command.TaskRejectMessage;
22+
import org.apache.dolphinscheduler.remote.command.TaskExecuteResultCommand;
23+
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
24+
import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
2525
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
2626

2727
import java.util.Date;
@@ -106,7 +106,7 @@ public static TaskEvent newDispatchEvent(int processInstanceId, int taskInstance
106106
return event;
107107
}
108108

109-
public static TaskEvent newRunningEvent(TaskExecuteRunningMessage command, Channel channel) {
109+
public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) {
110110
TaskEvent event = new TaskEvent();
111111
event.setProcessInstanceId(command.getProcessInstanceId());
112112
event.setTaskInstanceId(command.getTaskInstanceId());
@@ -120,7 +120,7 @@ public static TaskEvent newRunningEvent(TaskExecuteRunningMessage command, Chann
120120
return event;
121121
}
122122

123-
public static TaskEvent newResultEvent(TaskExecuteResultMessage command, Channel channel) {
123+
public static TaskEvent newResultEvent(TaskExecuteResultCommand command, Channel channel) {
124124
TaskEvent event = new TaskEvent();
125125
event.setProcessInstanceId(command.getProcessInstanceId());
126126
event.setTaskInstanceId(command.getTaskInstanceId());
@@ -138,7 +138,7 @@ public static TaskEvent newResultEvent(TaskExecuteResultMessage command, Channel
138138
return event;
139139
}
140140

141-
public static TaskEvent newRecallEvent(TaskRejectMessage command, Channel channel) {
141+
public static TaskEvent newRecallEvent(TaskRejectCommand command, Channel channel) {
142142
TaskEvent event = new TaskEvent();
143143
event.setTaskInstanceId(command.getTaskInstanceId());
144144
event.setProcessInstanceId(command.getProcessInstanceId());

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutionContextTestUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2525
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
2626
import org.apache.dolphinscheduler.remote.command.Command;
27-
import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage;
27+
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
2828
import org.apache.dolphinscheduler.remote.utils.Host;
2929
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
3030
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
@@ -48,7 +48,7 @@ public static ExecutionContext getExecutionContext(int port) {
4848
.buildProcessDefinitionRelatedInfo(processDefinition)
4949
.create();
5050

51-
TaskDispatchMessage requestCommand = new TaskDispatchMessage(context,
51+
TaskDispatchCommand requestCommand = new TaskDispatchCommand(context,
5252
"127.0.0.1:5678",
5353
"127.0.0.1:5678",
5454
System.currentTimeMillis());

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
2626
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
2727
import org.apache.dolphinscheduler.remote.command.Command;
28-
import org.apache.dolphinscheduler.remote.command.TaskDispatchMessage;
28+
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
2929
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
3030
import org.apache.dolphinscheduler.remote.utils.Host;
3131
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@@ -93,7 +93,7 @@ public void testExecuteWithException() throws ExecuteException {
9393

9494
}
9595
private Command toCommand(TaskExecutionContext taskExecutionContext) {
96-
TaskDispatchMessage requestCommand = new TaskDispatchMessage(taskExecutionContext,
96+
TaskDispatchCommand requestCommand = new TaskDispatchCommand(taskExecutionContext,
9797
"127.0.0.1:5678",
9898
"127.0.0.1:1234",
9999
System.currentTimeMillis());

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.dolphinscheduler.server.master.processor;
1919

20-
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningMessage;
20+
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
2121
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
2222
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
2323
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
@@ -44,7 +44,7 @@ public class TaskAckProcessorTest {
4444
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
4545
private TaskEventService taskEventService;
4646
private ProcessService processService;
47-
private TaskExecuteRunningMessage taskExecuteRunningMessage;
47+
private TaskExecuteRunningCommand taskExecuteRunningMessage;
4848
private TaskEvent taskResponseEvent;
4949
private Channel channel;
5050

@@ -63,7 +63,7 @@ public void before() {
6363
channel = PowerMockito.mock(Channel.class);
6464
taskResponseEvent = PowerMockito.mock(TaskEvent.class);
6565

66-
taskExecuteRunningMessage = new TaskExecuteRunningMessage("127.0.0.1:5678",
66+
taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
6767
" 127.0.0.1:1234",
6868
System.currentTimeMillis());
6969
taskExecuteRunningMessage.setStatus(1);

0 commit comments

Comments
 (0)