Skip to content

Commit 175d976

Browse files
authored
[Feature][Data Quality] Data Quality Support Choose Database (#14406)
* add dataquality database api * change ui * api change * update * fix spotless * fix h2 * fix pg * fix-dead-line * update * fix-spotless * update pg sql * add ut * fix ut
1 parent b720770 commit 175d976

29 files changed

Lines changed: 547 additions & 34 deletions

File tree

docs/docs/en/guide/upgrade/incompatible.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,7 @@ This document records the incompatible updates between each version. You need to
1818
* Copy and import workflow without 'copy' suffix [#10607](https://github.com/apache/dolphinscheduler/pull/10607)
1919
* Use semicolon as default sql segment separator [#10869](https://github.com/apache/dolphinscheduler/pull/10869)
2020

21+
## 3.2.0
22+
23+
* Add required field `database` in /datasources/tables && /datasources/tableColumns Api [#14406](https://github.com/apache/dolphinscheduler/pull/14406)
24+

docs/docs/zh/guide/upgrade/incompatible.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,7 @@
1818
* Copy and import workflow without 'copy' suffix [#10607](https://github.com/apache/dolphinscheduler/pull/10607)
1919
* Use semicolon as default sql segment separator [#10869](https://github.com/apache/dolphinscheduler/pull/10869)
2020

21+
## 3.2.0
22+
23+
* 在 /datasources/tables && /datasources/tableColumns 接口中添加了必选字段`database` [#14406](https://github.com/apache/dolphinscheduler/pull/14406)
24+

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.dolphinscheduler.api.enums.Status.CONNECT_DATASOURCE_FAILURE;
2323
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_DATASOURCE_ERROR;
2424
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_DATA_SOURCE_FAILURE;
25+
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_DATABASES_ERROR;
2526
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLES_ERROR;
2627
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLE_COLUMNS_ERROR;
2728
import static org.apache.dolphinscheduler.api.enums.Status.KERBEROS_STARTUP_STATE;
@@ -340,27 +341,43 @@ public Result getKerberosStartupState(@Parameter(hidden = true) @RequestAttribut
340341

341342
@Operation(summary = "tables", description = "GET_DATASOURCE_TABLES_NOTES")
342343
@Parameters({
343-
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1"))
344+
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1")),
345+
@Parameter(name = "database", description = "DATABASE", required = true, schema = @Schema(implementation = String.class, example = "test"))
344346
})
345347
@GetMapping(value = "/tables")
346348
@ResponseStatus(HttpStatus.OK)
347349
@ApiException(GET_DATASOURCE_TABLES_ERROR)
348-
public Result getTables(@RequestParam("datasourceId") Integer datasourceId) {
349-
Map<String, Object> result = dataSourceService.getTables(datasourceId);
350+
public Result getTables(@RequestParam("datasourceId") Integer datasourceId,
351+
@RequestParam(value = "database") String database) {
352+
Map<String, Object> result = dataSourceService.getTables(datasourceId, database);
350353
return returnDataList(result);
351354
}
352355

353356
@Operation(summary = "tableColumns", description = "GET_DATASOURCE_TABLE_COLUMNS_NOTES")
354357
@Parameters({
355358
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1")),
356-
@Parameter(name = "tableName", description = "TABLE_NAME", required = true, schema = @Schema(implementation = String.class, example = "test"))
359+
@Parameter(name = "tableName", description = "TABLE_NAME", required = true, schema = @Schema(implementation = String.class, example = "test")),
360+
@Parameter(name = "database", description = "DATABASE", required = true, schema = @Schema(implementation = String.class, example = "test"))
357361
})
358362
@GetMapping(value = "/tableColumns")
359363
@ResponseStatus(HttpStatus.OK)
360364
@ApiException(GET_DATASOURCE_TABLE_COLUMNS_ERROR)
361365
public Result getTableColumns(@RequestParam("datasourceId") Integer datasourceId,
362-
@RequestParam("tableName") String tableName) {
363-
Map<String, Object> result = dataSourceService.getTableColumns(datasourceId, tableName);
366+
@RequestParam("tableName") String tableName,
367+
@RequestParam(value = "database") String database) {
368+
Map<String, Object> result = dataSourceService.getTableColumns(datasourceId, database, tableName);
369+
return returnDataList(result);
370+
}
371+
372+
@Operation(summary = "databases", description = "GET_DATASOURCE_DATABASE_NOTES")
373+
@Parameters({
374+
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1"))
375+
})
376+
@GetMapping(value = "/databases")
377+
@ResponseStatus(HttpStatus.OK)
378+
@ApiException(GET_DATASOURCE_DATABASES_ERROR)
379+
public Result getDatabases(@RequestParam("datasourceId") Integer datasourceId) {
380+
Map<String, Object> result = dataSourceService.getDatabases(datasourceId);
364381
return returnDataList(result);
365382
}
366383
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ public enum Status {
482482
GET_DATASOURCE_OPTIONS_ERROR(1200017, "get datasource options error", "获取数据源Options错误"),
483483
GET_DATASOURCE_TABLES_ERROR(1200018, "get datasource tables error", "获取数据源表列表错误"),
484484
GET_DATASOURCE_TABLE_COLUMNS_ERROR(1200019, "get datasource table columns error", "获取数据源表列名错误"),
485+
GET_DATASOURCE_DATABASES_ERROR(1200035, "get datasource databases error", "获取数据库列表错误"),
485486

486487
CREATE_CLUSTER_ERROR(120020, "create cluster error", "创建集群失败"),
487488
CLUSTER_NAME_EXISTS(120021, "this cluster name [{0}] already exists", "集群名称[{0}]已经存在"),

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,24 @@ public interface DataSourceService {
132132
/**
133133
* get tables
134134
* @param datasourceId
135+
* @param database
135136
* @return
136137
*/
137-
Map<String, Object> getTables(Integer datasourceId);
138+
Map<String, Object> getTables(Integer datasourceId, String database);
138139

139140
/**
140141
* get table columns
141142
* @param datasourceId
143+
* @param database
142144
* @param tableName
143145
* @return
144146
*/
145-
Map<String, Object> getTableColumns(Integer datasourceId, String tableName);
147+
Map<String, Object> getTableColumns(Integer datasourceId, String database, String tableName);
148+
149+
/**
150+
* get databases
151+
* @param datasourceId
152+
* @return
153+
*/
154+
Map<String, Object> getDatabases(Integer datasourceId);
146155
}

dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ public Map<String, Object> authedDatasource(User loginUser, Integer userId) {
515515
}
516516

517517
@Override
518-
public Map<String, Object> getTables(Integer datasourceId) {
518+
public Map<String, Object> getTables(Integer datasourceId, String database) {
519519
Map<String, Object> result = new HashMap<>();
520520

521521
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
@@ -551,7 +551,7 @@ public Map<String, Object> getTables(Integer datasourceId) {
551551
}
552552

553553
tables = metaData.getTables(
554-
connectionParam.getDatabase(),
554+
database,
555555
getDbSchemaPattern(dataSource.getType(), schema, connectionParam),
556556
"%", TABLE_TYPES);
557557
if (null == tables) {
@@ -583,7 +583,7 @@ public Map<String, Object> getTables(Integer datasourceId) {
583583
}
584584

585585
@Override
586-
public Map<String, Object> getTableColumns(Integer datasourceId, String tableName) {
586+
public Map<String, Object> getTableColumns(Integer datasourceId, String database, String tableName) {
587587
Map<String, Object> result = new HashMap<>();
588588

589589
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
@@ -603,8 +603,6 @@ public Map<String, Object> getTableColumns(Integer datasourceId, String tableNam
603603
ResultSet rs = null;
604604

605605
try {
606-
607-
String database = connectionParam.getDatabase();
608606
if (null == connection) {
609607
return result;
610608
}
@@ -635,6 +633,62 @@ public Map<String, Object> getTableColumns(Integer datasourceId, String tableNam
635633
return result;
636634
}
637635

636+
@Override
637+
public Map<String, Object> getDatabases(Integer datasourceId) {
638+
Map<String, Object> result = new HashMap<>();
639+
640+
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
641+
642+
if (dataSource == null) {
643+
putMsg(result, Status.QUERY_DATASOURCE_ERROR);
644+
return result;
645+
}
646+
647+
List<String> tableList;
648+
BaseConnectionParam connectionParam =
649+
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(
650+
dataSource.getType(),
651+
dataSource.getConnectionParams());
652+
653+
if (null == connectionParam) {
654+
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
655+
return result;
656+
}
657+
658+
Connection connection =
659+
DataSourceUtils.getConnection(dataSource.getType(), connectionParam);
660+
ResultSet rs = null;
661+
662+
try {
663+
if (null == connection) {
664+
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
665+
return result;
666+
}
667+
if (dataSource.getType() == DbType.POSTGRESQL) {
668+
rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY_PG);
669+
}
670+
rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY);
671+
tableList = new ArrayList<>();
672+
while (rs.next()) {
673+
String name = rs.getString(1);
674+
tableList.add(name);
675+
}
676+
} catch (Exception e) {
677+
log.error("Get databases error, datasourceId:{}.", datasourceId, e);
678+
putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR);
679+
return result;
680+
} finally {
681+
closeResult(rs);
682+
releaseConnection(connection);
683+
}
684+
685+
List<ParamsOptions> options = getParamsOptions(tableList);
686+
687+
result.put(Constants.DATA_LIST, options);
688+
putMsg(result, Status.SUCCESS);
689+
return result;
690+
}
691+
638692
private List<ParamsOptions> getParamsOptions(List<String> columnList) {
639693
List<ParamsOptions> options = null;
640694
if (CollectionUtils.isNotEmpty(columnList)) {

dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
3939
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
4040
import org.apache.dolphinscheduler.plugin.datasource.hive.param.HiveDataSourceParamDTO;
41+
import org.apache.dolphinscheduler.plugin.datasource.mysql.param.MySQLConnectionParam;
4142
import org.apache.dolphinscheduler.plugin.datasource.mysql.param.MySQLDataSourceParamDTO;
4243
import org.apache.dolphinscheduler.plugin.datasource.oracle.param.OracleDataSourceParamDTO;
4344
import org.apache.dolphinscheduler.plugin.datasource.postgresql.param.PostgreSQLDataSourceParamDTO;
@@ -48,6 +49,7 @@
4849
import org.apache.commons.collections4.CollectionUtils;
4950

5051
import java.sql.Connection;
52+
import java.sql.SQLException;
5153
import java.util.ArrayList;
5254
import java.util.Collections;
5355
import java.util.HashMap;
@@ -517,4 +519,31 @@ public void testCheckConnection() throws Exception {
517519
}
518520
}
519521

522+
@Test
523+
public void testGetDatabases() throws SQLException {
524+
DataSource dataSource = getOracleDataSource();
525+
int datasourceId = 1;
526+
dataSource.setId(datasourceId);
527+
Map<String, Object> result;
528+
Mockito.when(dataSourceMapper.selectById(datasourceId)).thenReturn(null);
529+
result = dataSourceService.getDatabases(datasourceId);
530+
Assertions.assertEquals(Status.QUERY_DATASOURCE_ERROR, result.get(Constants.STATUS));
531+
532+
Mockito.when(dataSourceMapper.selectById(datasourceId)).thenReturn(dataSource);
533+
MySQLConnectionParam connectionParam = new MySQLConnectionParam();
534+
Connection connection = Mockito.mock(Connection.class);
535+
MockedStatic<DataSourceUtils> dataSourceUtils = Mockito.mockStatic(DataSourceUtils.class);
536+
dataSourceUtils.when(() -> DataSourceUtils.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection);
537+
dataSourceUtils.when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
538+
.thenReturn(connectionParam);
539+
result = dataSourceService.getDatabases(datasourceId);
540+
Assertions.assertEquals(Status.GET_DATASOURCE_TABLES_ERROR, result.get(Constants.STATUS));
541+
542+
dataSourceUtils.when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
543+
.thenReturn(null);
544+
result = dataSourceService.getDatabases(datasourceId);
545+
Assertions.assertEquals(Status.DATASOURCE_CONNECT_FAILED, result.get(Constants.STATUS));
546+
connection.close();
547+
dataSourceUtils.close();
548+
}
520549
}

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,4 +850,10 @@ private Constants() {
850850
public static final String REMOTE_LOGGING_GCS_CREDENTIAL = "remote.logging.google.cloud.storage.credential";
851851

852852
public static final String REMOTE_LOGGING_GCS_BUCKET_NAME = "remote.logging.google.cloud.storage.bucket.name";
853+
854+
/**
855+
* data quality
856+
*/
857+
public static final String DATABASES_QUERY = "show databases";
858+
public static final String DATABASES_QUERY_PG = "SELECT datname FROM pg_database";
853859
}

dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,6 +1368,12 @@ VALUES(28, 'enum_list', 'input', '$t(enum_list)', NULL, NULL, 'Please enter enum
13681368
INSERT INTO `t_ds_dq_rule_input_entry`
13691369
(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
13701370
VALUES(29, 'begin_time', 'input', '$t(begin_time)', NULL, NULL, 'Please enter begin time', 0, 0, 0, 1, 1, 0, 0, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
1371+
INSERT INTO `t_ds_dq_rule_input_entry`
1372+
(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
1373+
VALUES(30, 'src_database', 'select', '$t(src_database)', NULL, NULL, 'Please select source database', 0, 0, 0, 1, 1, 1, 1, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
1374+
INSERT INTO `t_ds_dq_rule_input_entry`
1375+
(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
1376+
VALUES(31, 'target_database', 'select', '$t(target_database)', NULL, NULL, 'Please select target database', 0, 0, 0, 1, 1, 1, 1, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
13711377

13721378
--
13731379
-- Table structure for table `t_ds_dq_task_statistics_value`
@@ -1851,9 +1857,45 @@ VALUES(148, 10, 17, NULL, 11, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.00
18511857
INSERT INTO `t_ds_relation_rule_input_entry`
18521858
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
18531859
VALUES(149, 10, 19, NULL, 12, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1854-
INSERT INTO t_ds_relation_rule_input_entry
1860+
INSERT INTO `t_ds_relation_rule_input_entry`
1861+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1862+
VALUES(150, 8, 29, NULL, 7, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1863+
INSERT INTO `t_ds_relation_rule_input_entry`
1864+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1865+
VALUES(151, 1, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1866+
INSERT INTO `t_ds_relation_rule_input_entry`
1867+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1868+
VALUES(152, 2, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1869+
INSERT INTO `t_ds_relation_rule_input_entry`
1870+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1871+
VALUES(153, 3, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1872+
INSERT INTO `t_ds_relation_rule_input_entry`
1873+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1874+
VALUES(154, 4, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1875+
INSERT INTO `t_ds_relation_rule_input_entry`
1876+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1877+
VALUES(155, 5, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1878+
INSERT INTO `t_ds_relation_rule_input_entry`
1879+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1880+
VALUES(156, 6, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1881+
INSERT INTO `t_ds_relation_rule_input_entry`
1882+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1883+
VALUES(157, 7, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1884+
INSERT INTO `t_ds_relation_rule_input_entry`
1885+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1886+
VALUES(158, 8, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1887+
INSERT INTO `t_ds_relation_rule_input_entry`
1888+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1889+
VALUES(159, 9, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1890+
INSERT INTO `t_ds_relation_rule_input_entry`
1891+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1892+
VALUES(160, 10, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1893+
INSERT INTO `t_ds_relation_rule_input_entry`
1894+
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1895+
VALUES(161, 3, 31, NULL, 6, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
1896+
INSERT INTO `t_ds_relation_rule_input_entry`
18551897
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
1856-
VALUES(150, 8, 29, NULL, 7, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
1898+
VALUES(162, 4, 31, NULL, 7, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
18571899

18581900
--
18591901
-- Table structure for table t_ds_environment

0 commit comments

Comments
 (0)