Skip to content

Commit 71cf7e1

Browse files
authored
[Fix-10918] Close datasource when expire from guava cache (#11120)
* Close datasource when expire from guava cache * Remove duplicate datasource in HiveDataSourceClient
1 parent 952aee8 commit 71cf7e1

4 files changed

Lines changed: 37 additions & 27 deletions

File tree

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/CommonDataSourceClient.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,12 @@
2727
import java.sql.SQLException;
2828
import java.util.concurrent.TimeUnit;
2929

30-
import javax.sql.DataSource;
31-
3230
import org.slf4j.Logger;
3331
import org.slf4j.LoggerFactory;
3432
import org.springframework.jdbc.core.JdbcTemplate;
3533

3634
import com.google.common.base.Stopwatch;
35+
import com.zaxxer.hikari.HikariDataSource;
3736

3837
public class CommonDataSourceClient implements DataSourceClient {
3938

@@ -43,7 +42,7 @@ public class CommonDataSourceClient implements DataSourceClient {
4342
public static final String COMMON_VALIDATION_QUERY = "select 1";
4443

4544
protected final BaseConnectionParam baseConnectionParam;
46-
protected DataSource dataSource;
45+
protected HikariDataSource dataSource;
4746
protected JdbcTemplate jdbcTemplate;
4847

4948
public CommonDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) {
@@ -113,8 +112,10 @@ public Connection getConnection() {
113112

114113
@Override
115114
public void close() {
116-
logger.info("do close dataSource.");
117-
this.dataSource = null;
115+
logger.info("do close dataSource {}.", baseConnectionParam.getDatabase());
116+
try (HikariDataSource closedDatasource = dataSource) {
117+
// only close the resource
118+
}
118119
this.jdbcTemplate = null;
119120
}
120121

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
1919

20-
import com.google.common.cache.Cache;
21-
import com.google.common.cache.CacheBuilder;
2220
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
2321
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
2422
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
@@ -27,23 +25,32 @@
2725
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient;
2826
import org.apache.dolphinscheduler.spi.enums.DbType;
2927
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
30-
import org.slf4j.Logger;
31-
import org.slf4j.LoggerFactory;
3228

3329
import java.sql.Connection;
3430
import java.util.Map;
3531
import java.util.concurrent.ExecutionException;
3632
import java.util.concurrent.TimeUnit;
3733

34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
import com.google.common.cache.Cache;
38+
import com.google.common.cache.CacheBuilder;
39+
import com.google.common.cache.RemovalListener;
3840

3941
public class DataSourceClientProvider {
4042
private static final Logger logger = LoggerFactory.getLogger(DataSourceClientProvider.class);
4143

42-
private static long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24);
44+
private static final long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24);
4345
private static final Cache<String, DataSourceClient> uniqueId2dataSourceClientCache = CacheBuilder.newBuilder()
44-
.expireAfterWrite(duration, TimeUnit.HOURS)
45-
.maximumSize(100)
46-
.build();
46+
.expireAfterWrite(duration, TimeUnit.HOURS)
47+
.removalListener((RemovalListener<String, DataSourceClient>) notification -> {
48+
try (DataSourceClient closedClient = notification.getValue()) {
49+
logger.info("Datasource: {} is removed from cache due to expire", notification.getKey());
50+
}
51+
})
52+
.maximumSize(100)
53+
.build();
4754
private DataSourcePluginManager dataSourcePluginManager;
4855

4956
private DataSourceClientProvider() {
@@ -61,7 +68,7 @@ public static DataSourceClientProvider getInstance() {
6168
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {
6269
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;
6370
String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
64-
logger.info("getConnection datasourceUniqueId {}", datasourceUniqueId);
71+
logger.info("Get connection from datasource {}", datasourceUniqueId);
6572

6673
DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {
6774
Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();

dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/HiveDataSourceClient.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.dolphinscheduler.plugin.datasource.hive;
1919

20-
import com.google.common.util.concurrent.ThreadFactoryBuilder;
21-
import com.zaxxer.hikari.HikariDataSource;
20+
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE;
21+
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF;
22+
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JAVA_SECURITY_KRB5_CONF_PATH;
23+
2224
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient;
2325
import org.apache.dolphinscheduler.plugin.datasource.api.provider.JDBCDataSourceProvider;
2426
import org.apache.dolphinscheduler.plugin.datasource.hive.utils.CommonUtil;
@@ -27,11 +29,9 @@
2729
import org.apache.dolphinscheduler.spi.utils.Constants;
2830
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
2931
import org.apache.dolphinscheduler.spi.utils.StringUtils;
32+
3033
import org.apache.hadoop.conf.Configuration;
3134
import org.apache.hadoop.security.UserGroupInformation;
32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
34-
import sun.security.krb5.Config;
3535

3636
import java.io.IOException;
3737
import java.lang.reflect.Field;
@@ -41,7 +41,12 @@
4141
import java.util.concurrent.ScheduledExecutorService;
4242
import java.util.concurrent.TimeUnit;
4343

44-
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.*;
44+
import org.slf4j.Logger;
45+
import org.slf4j.LoggerFactory;
46+
47+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
48+
49+
import sun.security.krb5.Config;
4550

4651
public class HiveDataSourceClient extends CommonDataSourceClient {
4752

@@ -50,7 +55,6 @@ public class HiveDataSourceClient extends CommonDataSourceClient {
5055
private ScheduledExecutorService kerberosRenewalService;
5156

5257
private Configuration hadoopConf;
53-
protected HikariDataSource oneSessionDataSource;
5458
private UserGroupInformation ugi;
5559
private boolean retryGetConnection = true;
5660

@@ -76,7 +80,7 @@ protected void initClient(BaseConnectionParam baseConnectionParam, DbType dbType
7680
logger.info("Create ugi success.");
7781

7882
super.initClient(baseConnectionParam, dbType);
79-
this.oneSessionDataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
83+
this.dataSource = JDBCDataSourceProvider.createOneSessionJdbcDataSource(baseConnectionParam, dbType);
8084
logger.info("Init {} success.", getClass().getName());
8185
}
8286

@@ -144,7 +148,7 @@ protected Configuration getHadoopConf() {
144148
@Override
145149
public Connection getConnection() {
146150
try {
147-
return oneSessionDataSource.getConnection();
151+
return dataSource.getConnection();
148152
} catch (SQLException e) {
149153
boolean kerberosStartupState = PropertyUtils.getBoolean(HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
150154
if (retryGetConnection && kerberosStartupState) {
@@ -166,8 +170,5 @@ public void close() {
166170
logger.info("close {}.", this.getClass().getSimpleName());
167171
kerberosRenewalService.shutdown();
168172
this.ugi = null;
169-
170-
this.oneSessionDataSource.close();
171-
this.oneSessionDataSource = null;
172173
}
173174
}

dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/DataSourceClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919

2020
import java.sql.Connection;
2121

22-
public interface DataSourceClient {
22+
public interface DataSourceClient extends AutoCloseable {
2323

2424
void checkClient();
2525

26+
@Override
2627
void close();
2728

2829
Connection getConnection();

0 commit comments

Comments
 (0)