Skip to content

Commit d3bd730

Browse files
caishunfengcaishunfeng
andauthored
[Bug-7686][Server]fix restart server after kill force (#7688)
* [DS-7686][Server]fix restart server after kill force * update registry logic Co-authored-by: caishunfeng <534328519@qq.com>
1 parent 71ccb42 commit d3bd730

2 files changed

Lines changed: 32 additions & 13 deletions

File tree

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,6 @@ public void start() {
118118
registryClient.getLock(nodeLock);
119119
// master registry
120120
registry();
121-
String registryPath = getMasterPath();
122-
registryClient.handleDeadServer(Collections.singleton(registryPath), NodeType.MASTER, Constants.DELETE_OP);
123-
124-
// init system node
125-
126-
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
127-
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
128-
}
129121

130122
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_NODE, new MasterRegistryDataListener());
131123
} catch (Exception e) {
@@ -500,7 +492,20 @@ public void registry() {
500492
Constants.MASTER_TYPE,
501493
registryClient);
502494

495+
// remove before persist
496+
registryClient.remove(localNodePath);
503497
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
498+
499+
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.MASTER)) {
500+
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
501+
}
502+
503+
// sleep 1s, waiting master failover remove
504+
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
505+
506+
// delete dead server
507+
registryClient.handleDeadServer(Collections.singleton(localNodePath), NodeType.MASTER, Constants.DELETE_OP);
508+
504509
registryClient.addConnectionStateListener(this::handleConnectionState);
505510
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
506511
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);

dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
2121
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
2222
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
23+
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
2324

2425
import org.apache.dolphinscheduler.common.Constants;
2526
import org.apache.dolphinscheduler.common.IStoppable;
2627
import org.apache.dolphinscheduler.common.enums.NodeType;
28+
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
2729
import org.apache.dolphinscheduler.common.utils.NetUtils;
2830
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
2931
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
@@ -99,11 +101,6 @@ public void registry() {
99101
Set<String> workerZkPaths = getWorkerZkPaths();
100102
int workerHeartbeatInterval = workerConfig.getHeartbeatInterval();
101103

102-
for (String workerZKPath : workerZkPaths) {
103-
registryClient.persistEphemeral(workerZKPath, "");
104-
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
105-
}
106-
107104
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
108105
workerConfig.getMaxCpuLoadAvg(),
109106
workerConfig.getReservedMemory(),
@@ -115,6 +112,23 @@ public void registry() {
115112
workerManagerThread.getThreadPoolQueueSize()
116113
);
117114

115+
for (String workerZKPath : workerZkPaths) {
116+
// remove before persist
117+
registryClient.remove(workerZKPath);
118+
registryClient.persistEphemeral(workerZKPath, heartBeatTask.getHeartBeatInfo());
119+
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
120+
}
121+
122+
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) {
123+
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
124+
}
125+
126+
// sleep 1s, waiting master failover remove
127+
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
128+
129+
// delete dead server
130+
registryClient.handleDeadServer(workerZkPaths, NodeType.WORKER, Constants.DELETE_OP);
131+
118132
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
119133
logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
120134
}

0 commit comments

Comments
 (0)