3131import org .apache .dolphinscheduler .extract .base .utils .NettyUtils ;
3232
3333import java .net .InetSocketAddress ;
34+ import java .util .Map ;
3435import java .util .concurrent .ConcurrentHashMap ;
3536import java .util .concurrent .ThreadFactory ;
3637import java .util .concurrent .TimeUnit ;
3738import java .util .concurrent .atomic .AtomicBoolean ;
39+ import java .util .concurrent .locks .ReentrantLock ;
3840
3941import lombok .extern .slf4j .Slf4j ;
4042import io .netty .bootstrap .Bootstrap ;
@@ -54,7 +56,8 @@ public class NettyRemotingClient implements AutoCloseable {
5456
5557 private final Bootstrap bootstrap = new Bootstrap ();
5658
57- private final ConcurrentHashMap <Host , Channel > channels = new ConcurrentHashMap <>(128 );
59+ private final ReentrantLock channelsLock = new ReentrantLock ();
60+ private final Map <Host , Channel > channels = new ConcurrentHashMap <>();
5861
5962 private final AtomicBoolean isStarted = new AtomicBoolean (false );
6063
@@ -104,9 +107,10 @@ public void initChannel(SocketChannel ch) {
104107 isStarted .compareAndSet (false , true );
105108 }
106109
107- public IRpcResponse sendSync (final Host host , final Transporter transporter ,
110+ public IRpcResponse sendSync (final Host host ,
111+ final Transporter transporter ,
108112 final long timeoutMillis ) throws InterruptedException , RemotingException {
109- final Channel channel = getChannel (host );
113+ final Channel channel = getOrCreateChannel (host );
110114 if (channel == null ) {
111115 throw new RemotingException (String .format ("connect to : %s fail" , host ));
112116 }
@@ -137,36 +141,43 @@ public IRpcResponse sendSync(final Host host, final Transporter transporter,
137141 return iRpcResponse ;
138142 }
139143
140- private Channel getChannel (Host host ) {
144+ private Channel getOrCreateChannel (Host host ) {
141145 Channel channel = channels .get (host );
142146 if (channel != null && channel .isActive ()) {
143147 return channel ;
144148 }
145- return createChannel (host , true );
149+ try {
150+ channelsLock .lock ();
151+ channel = channels .get (host );
152+ if (channel != null && channel .isActive ()) {
153+ return channel ;
154+ }
155+ channel = createChannel (host );
156+ channels .put (host , channel );
157+ } finally {
158+ channelsLock .unlock ();
159+ }
160+ return channel ;
146161 }
147162
148163 /**
149164 * create channel
150165 *
151- * @param host host
152- * @param isSync sync flag
166+ * @param host host
153167 * @return channel
154168 */
155- private Channel createChannel (Host host , boolean isSync ) {
169+ private Channel createChannel (Host host ) {
156170 try {
157171 ChannelFuture future ;
158172 synchronized (bootstrap ) {
159173 future = bootstrap .connect (new InetSocketAddress (host .getIp (), host .getPort ()));
160174 }
161- if (isSync ) {
162- future .sync ();
163- }
175+ future .await (clientConfig .getConnectTimeoutMillis ());
164176 if (future .isSuccess ()) {
165- Channel channel = future .channel ();
166- channels . put ( host , channel );
167- return channel ;
177+ return future .channel ();
178+ } else {
179+ throw new IllegalArgumentException ( "connect to host: " + host + " failed" , future . cause ()) ;
168180 }
169- throw new IllegalArgumentException ("connect to host: " + host + " failed" );
170181 } catch (InterruptedException e ) {
171182 Thread .currentThread ().interrupt ();
172183 throw new RuntimeException ("Connect to host: " + host + " failed" , e );
@@ -189,16 +200,23 @@ public void close() {
189200 }
190201
191202 private void closeChannels () {
192- for (Channel channel : this .channels .values ()) {
193- channel .close ();
203+ try {
204+ channelsLock .lock ();
205+ channels .values ().forEach (Channel ::close );
206+ } finally {
207+ channelsLock .unlock ();
194208 }
195- this .channels .clear ();
196209 }
197210
198211 public void closeChannel (Host host ) {
199- Channel channel = this .channels .remove (host );
200- if (channel != null ) {
201- channel .close ();
212+ try {
213+ channelsLock .lock ();
214+ Channel channel = this .channels .remove (host );
215+ if (channel != null ) {
216+ channel .close ();
217+ }
218+ } finally {
219+ channelsLock .unlock ();
202220 }
203221 }
204222}
0 commit comments