Skip to content

Commit c5feb69

Browse files
authored
Merge 27223e7 into 7b7a333
2 parents 7b7a333 + 27223e7 commit c5feb69

2 files changed

Lines changed: 70 additions & 21 deletions

File tree

  • dolphinscheduler-extract/dolphinscheduler-extract-base/src

dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java

Lines changed: 39 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@
3131
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
3232

3333
import java.net.InetSocketAddress;
34+
import java.util.Map;
3435
import java.util.concurrent.ConcurrentHashMap;
3536
import java.util.concurrent.ThreadFactory;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.concurrent.locks.ReentrantLock;
3840

3941
import lombok.extern.slf4j.Slf4j;
4042
import io.netty.bootstrap.Bootstrap;
@@ -54,7 +56,8 @@ public class NettyRemotingClient implements AutoCloseable {
5456

5557
private final Bootstrap bootstrap = new Bootstrap();
5658

57-
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);
59+
private final ReentrantLock channelsLock = new ReentrantLock();
60+
private final Map<Host, Channel> channels = new ConcurrentHashMap<>();
5861

5962
private final AtomicBoolean isStarted = new AtomicBoolean(false);
6063

@@ -104,9 +107,10 @@ public void initChannel(SocketChannel ch) {
104107
isStarted.compareAndSet(false, true);
105108
}
106109

107-
public IRpcResponse sendSync(final Host host, final Transporter transporter,
110+
public IRpcResponse sendSync(final Host host,
111+
final Transporter transporter,
108112
final long timeoutMillis) throws InterruptedException, RemotingException {
109-
final Channel channel = getChannel(host);
113+
final Channel channel = getOrCreateChannel(host);
110114
if (channel == null) {
111115
throw new RemotingException(String.format("connect to : %s fail", host));
112116
}
@@ -137,36 +141,43 @@ public IRpcResponse sendSync(final Host host, final Transporter transporter,
137141
return iRpcResponse;
138142
}
139143

140-
private Channel getChannel(Host host) {
144+
private Channel getOrCreateChannel(Host host) {
141145
Channel channel = channels.get(host);
142146
if (channel != null && channel.isActive()) {
143147
return channel;
144148
}
145-
return createChannel(host, true);
149+
try {
150+
channelsLock.lock();
151+
channel = channels.get(host);
152+
if (channel != null && channel.isActive()) {
153+
return channel;
154+
}
155+
channel = createChannel(host);
156+
channels.put(host, channel);
157+
} finally {
158+
channelsLock.unlock();
159+
}
160+
return channel;
146161
}
147162

148163
/**
149164
* create channel
150165
*
151-
* @param host host
152-
* @param isSync sync flag
166+
* @param host host
153167
* @return channel
154168
*/
155-
private Channel createChannel(Host host, boolean isSync) {
169+
private Channel createChannel(Host host) {
156170
try {
157171
ChannelFuture future;
158172
synchronized (bootstrap) {
159173
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
160174
}
161-
if (isSync) {
162-
future.sync();
163-
}
175+
future.await(clientConfig.getConnectTimeoutMillis());
164176
if (future.isSuccess()) {
165-
Channel channel = future.channel();
166-
channels.put(host, channel);
167-
return channel;
177+
return future.channel();
178+
} else {
179+
throw new IllegalArgumentException("connect to host: " + host + " failed", future.cause());
168180
}
169-
throw new IllegalArgumentException("connect to host: " + host + " failed");
170181
} catch (InterruptedException e) {
171182
Thread.currentThread().interrupt();
172183
throw new RuntimeException("Connect to host: " + host + " failed", e);
@@ -189,16 +200,23 @@ public void close() {
189200
}
190201

191202
private void closeChannels() {
192-
for (Channel channel : this.channels.values()) {
193-
channel.close();
203+
try {
204+
channelsLock.lock();
205+
channels.values().forEach(Channel::close);
206+
} finally {
207+
channelsLock.unlock();
194208
}
195-
this.channels.clear();
196209
}
197210

198211
public void closeChannel(Host host) {
199-
Channel channel = this.channels.remove(host);
200-
if (channel != null) {
201-
channel.close();
212+
try {
213+
channelsLock.lock();
214+
Channel channel = this.channels.remove(host);
215+
if (channel != null) {
216+
channel.close();
217+
}
218+
} finally {
219+
channelsLock.unlock();
202220
}
203221
}
204222
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.extract.base.utils;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import com.google.common.truth.Truth;
23+
24+
class HostTest {
25+
26+
@Test
27+
void testEquals() {
28+
Truth.assertThat(Host.of("localhost:8080")).isEqualTo(Host.of("localhost:8080"));
29+
}
30+
31+
}

0 commit comments

Comments
 (0)