2020import static org .apache .dolphinscheduler .common .Constants .DEFAULT_WORKER_GROUP ;
2121import static org .apache .dolphinscheduler .common .Constants .REGISTRY_DOLPHINSCHEDULER_WORKERS ;
2222import static org .apache .dolphinscheduler .common .Constants .SINGLE_SLASH ;
23+ import static org .apache .dolphinscheduler .common .Constants .SLEEP_TIME_MILLIS ;
2324
2425import org .apache .dolphinscheduler .common .Constants ;
2526import org .apache .dolphinscheduler .common .IStoppable ;
2627import org .apache .dolphinscheduler .common .enums .NodeType ;
28+ import org .apache .dolphinscheduler .common .thread .ThreadUtils ;
2729import org .apache .dolphinscheduler .common .utils .NetUtils ;
2830import org .apache .dolphinscheduler .remote .utils .NamedThreadFactory ;
2931import org .apache .dolphinscheduler .server .registry .HeartBeatTask ;
@@ -99,11 +101,6 @@ public void registry() {
99101 Set <String > workerZkPaths = getWorkerZkPaths ();
100102 int workerHeartbeatInterval = workerConfig .getHeartbeatInterval ();
101103
102- for (String workerZKPath : workerZkPaths ) {
103- registryClient .persistEphemeral (workerZKPath , "" );
104- logger .info ("worker node : {} registry to ZK {} successfully" , address , workerZKPath );
105- }
106-
107104 HeartBeatTask heartBeatTask = new HeartBeatTask (startupTime ,
108105 workerConfig .getMaxCpuLoadAvg (),
109106 workerConfig .getReservedMemory (),
@@ -115,6 +112,23 @@ public void registry() {
115112 workerManagerThread .getThreadPoolQueueSize ()
116113 );
117114
115+ for (String workerZKPath : workerZkPaths ) {
116+ // remove before persist
117+ registryClient .remove (workerZKPath );
118+ registryClient .persistEphemeral (workerZKPath , heartBeatTask .getHeartBeatInfo ());
119+ logger .info ("worker node : {} registry to ZK {} successfully" , address , workerZKPath );
120+ }
121+
122+ while (!registryClient .checkNodeExists (NetUtils .getHost (), NodeType .WORKER )) {
123+ ThreadUtils .sleep (SLEEP_TIME_MILLIS );
124+ }
125+
126+ // sleep 1s, waiting master failover remove
127+ ThreadUtils .sleep (Constants .SLEEP_TIME_MILLIS );
128+
129+ // delete dead server
130+ registryClient .handleDeadServer (workerZkPaths , NodeType .WORKER , Constants .DELETE_OP );
131+
118132 this .heartBeatExecutor .scheduleAtFixedRate (heartBeatTask , workerHeartbeatInterval , workerHeartbeatInterval , TimeUnit .SECONDS );
119133 logger .info ("worker node : {} heartbeat interval {} s" , address , workerHeartbeatInterval );
120134 }
0 commit comments