Skip to content

Commit c4e0b4a

Browse files
Merge pull request #14856: [BEAM-11873] Add support for writes with returning values in JdbcIO
2 parents 1700099 + 4369e3a commit c4e0b4a

6 files changed

Lines changed: 477 additions & 32 deletions

File tree

sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/DatabaseTestHelper.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717
*/
1818
package org.apache.beam.sdk.io.common;
1919

20+
import static org.junit.Assert.assertEquals;
21+
2022
import java.sql.Connection;
2123
import java.sql.ResultSet;
2224
import java.sql.SQLException;
2325
import java.sql.Statement;
2426
import java.text.SimpleDateFormat;
27+
import java.util.ArrayList;
2528
import java.util.Date;
2629
import java.util.Optional;
2730
import javax.sql.DataSource;
31+
import org.apache.beam.sdk.values.KV;
2832
import org.postgresql.ds.PGSimpleDataSource;
2933

3034
/** This class contains helper methods to ease database usage in tests. */
@@ -104,4 +108,26 @@ public static void createTableWithStatement(DataSource dataSource, String stmt)
104108
}
105109
}
106110
}
111+
112+
public static ArrayList<KV<Integer, String>> getTestDataToWrite(long rowsToAdd) {
113+
ArrayList<KV<Integer, String>> data = new ArrayList<>();
114+
for (int i = 0; i < rowsToAdd; i++) {
115+
KV<Integer, String> kv = KV.of(i, "Test");
116+
data.add(kv);
117+
}
118+
return data;
119+
}
120+
121+
public static void assertRowCount(DataSource dataSource, String tableName, int expectedRowCount)
122+
throws SQLException {
123+
try (Connection connection = dataSource.getConnection()) {
124+
try (Statement statement = connection.createStatement()) {
125+
try (ResultSet resultSet = statement.executeQuery("select count(*) from " + tableName)) {
126+
resultSet.next();
127+
int count = resultSet.getInt(1);
128+
assertEquals(expectedRowCount, count);
129+
}
130+
}
131+
}
132+
}
107133
}

sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java

Lines changed: 265 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ public static <T> ReadWithPartitions<T> readWithPartitions() {
342342
* @param <T> Type of the data to be written.
343343
*/
344344
public static <T> Write<T> write() {
345-
return new Write();
345+
return new Write<>();
346346
}
347347

348348
public static <T> WriteVoid<T> writeVoid() {
@@ -1283,43 +1283,43 @@ public static class Write<T> extends PTransform<PCollection<T>, PDone> {
12831283

12841284
/** See {@link WriteVoid#withDataSourceConfiguration(DataSourceConfiguration)}. */
12851285
public Write<T> withDataSourceConfiguration(DataSourceConfiguration config) {
1286-
return new Write(inner.withDataSourceConfiguration(config));
1286+
return new Write<>(inner.withDataSourceConfiguration(config));
12871287
}
12881288

12891289
/** See {@link WriteVoid#withDataSourceProviderFn(SerializableFunction)}. */
12901290
public Write<T> withDataSourceProviderFn(
12911291
SerializableFunction<Void, DataSource> dataSourceProviderFn) {
1292-
return new Write(inner.withDataSourceProviderFn(dataSourceProviderFn));
1292+
return new Write<>(inner.withDataSourceProviderFn(dataSourceProviderFn));
12931293
}
12941294

12951295
/** See {@link WriteVoid#withStatement(String)}. */
12961296
public Write<T> withStatement(String statement) {
1297-
return new Write(inner.withStatement(statement));
1297+
return new Write<>(inner.withStatement(statement));
12981298
}
12991299

13001300
/** See {@link WriteVoid#withPreparedStatementSetter(PreparedStatementSetter)}. */
13011301
public Write<T> withPreparedStatementSetter(PreparedStatementSetter<T> setter) {
1302-
return new Write(inner.withPreparedStatementSetter(setter));
1302+
return new Write<>(inner.withPreparedStatementSetter(setter));
13031303
}
13041304

13051305
/** See {@link WriteVoid#withBatchSize(long)}. */
13061306
public Write<T> withBatchSize(long batchSize) {
1307-
return new Write(inner.withBatchSize(batchSize));
1307+
return new Write<>(inner.withBatchSize(batchSize));
13081308
}
13091309

13101310
/** See {@link WriteVoid#withRetryStrategy(RetryStrategy)}. */
13111311
public Write<T> withRetryStrategy(RetryStrategy retryStrategy) {
1312-
return new Write(inner.withRetryStrategy(retryStrategy));
1312+
return new Write<>(inner.withRetryStrategy(retryStrategy));
13131313
}
13141314

13151315
/** See {@link WriteVoid#withRetryConfiguration(RetryConfiguration)}. */
13161316
public Write<T> withRetryConfiguration(RetryConfiguration retryConfiguration) {
1317-
return new Write(inner.withRetryConfiguration(retryConfiguration));
1317+
return new Write<>(inner.withRetryConfiguration(retryConfiguration));
13181318
}
13191319

13201320
/** See {@link WriteVoid#withTable(String)}. */
13211321
public Write<T> withTable(String table) {
1322-
return new Write(inner.withTable(table));
1322+
return new Write<>(inner.withTable(table));
13231323
}
13241324

13251325
/**
@@ -1341,6 +1341,24 @@ public WriteVoid<T> withResults() {
13411341
return inner;
13421342
}
13431343

1344+
/**
1345+
* Returns {@link WriteWithResults} transform that could return a specific result.
1346+
*
1347+
* <p>See {@link WriteWithResults}
1348+
*/
1349+
public <V extends JdbcWriteResult> WriteWithResults<T, V> withWriteResults(
1350+
RowMapper<V> rowMapper) {
1351+
return new AutoValue_JdbcIO_WriteWithResults.Builder<T, V>()
1352+
.setRowMapper(rowMapper)
1353+
.setRetryStrategy(inner.getRetryStrategy())
1354+
.setRetryConfiguration(inner.getRetryConfiguration())
1355+
.setDataSourceProviderFn(inner.getDataSourceProviderFn())
1356+
.setPreparedStatementSetter(inner.getPreparedStatementSetter())
1357+
.setStatement(inner.getStatement())
1358+
.setTable(inner.getTable())
1359+
.build();
1360+
}
1361+
13441362
@Override
13451363
public void populateDisplayData(DisplayData.Builder builder) {
13461364
inner.populateDisplayData(builder);
@@ -1364,7 +1382,244 @@ void set(
13641382
throws SQLException;
13651383
}
13661384

1367-
/** A {@link PTransform} to write to a JDBC datasource. */
1385+
/**
1386+
* A {@link PTransform} to write to a JDBC datasource. Executes statements one by one.
1387+
*
1388+
* <p>The INSERT, UPDATE, and DELETE commands sometimes have an optional RETURNING clause that
1389+
* supports obtaining data from modified rows while they are being manipulated. Output {@link
1390+
* PCollection} of this transform is a collection of such returning results mapped by {@link
1391+
* RowMapper}.
1392+
*/
1393+
@AutoValue
1394+
public abstract static class WriteWithResults<T, V extends JdbcWriteResult>
1395+
extends PTransform<PCollection<T>, PCollection<V>> {
1396+
abstract @Nullable SerializableFunction<Void, DataSource> getDataSourceProviderFn();
1397+
1398+
abstract @Nullable ValueProvider<String> getStatement();
1399+
1400+
abstract @Nullable PreparedStatementSetter<T> getPreparedStatementSetter();
1401+
1402+
abstract @Nullable RetryStrategy getRetryStrategy();
1403+
1404+
abstract @Nullable RetryConfiguration getRetryConfiguration();
1405+
1406+
abstract @Nullable String getTable();
1407+
1408+
abstract @Nullable RowMapper<V> getRowMapper();
1409+
1410+
abstract Builder<T, V> toBuilder();
1411+
1412+
@AutoValue.Builder
1413+
abstract static class Builder<T, V extends JdbcWriteResult> {
1414+
abstract Builder<T, V> setDataSourceProviderFn(
1415+
SerializableFunction<Void, DataSource> dataSourceProviderFn);
1416+
1417+
abstract Builder<T, V> setStatement(ValueProvider<String> statement);
1418+
1419+
abstract Builder<T, V> setPreparedStatementSetter(PreparedStatementSetter<T> setter);
1420+
1421+
abstract Builder<T, V> setRetryStrategy(RetryStrategy deadlockPredicate);
1422+
1423+
abstract Builder<T, V> setRetryConfiguration(RetryConfiguration retryConfiguration);
1424+
1425+
abstract Builder<T, V> setTable(String table);
1426+
1427+
abstract Builder<T, V> setRowMapper(RowMapper<V> rowMapper);
1428+
1429+
abstract WriteWithResults<T, V> build();
1430+
}
1431+
1432+
public WriteWithResults<T, V> withDataSourceConfiguration(DataSourceConfiguration config) {
1433+
return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
1434+
}
1435+
1436+
public WriteWithResults<T, V> withDataSourceProviderFn(
1437+
SerializableFunction<Void, DataSource> dataSourceProviderFn) {
1438+
return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build();
1439+
}
1440+
1441+
public WriteWithResults<T, V> withStatement(String statement) {
1442+
return withStatement(ValueProvider.StaticValueProvider.of(statement));
1443+
}
1444+
1445+
public WriteWithResults<T, V> withStatement(ValueProvider<String> statement) {
1446+
return toBuilder().setStatement(statement).build();
1447+
}
1448+
1449+
public WriteWithResults<T, V> withPreparedStatementSetter(PreparedStatementSetter<T> setter) {
1450+
return toBuilder().setPreparedStatementSetter(setter).build();
1451+
}
1452+
1453+
/**
1454+
* When a SQL exception occurs, {@link Write} uses this {@link RetryStrategy} to determine if it
1455+
* will retry the statements. If {@link RetryStrategy#apply(SQLException)} returns {@code true},
1456+
* then {@link Write} retries the statements.
1457+
*/
1458+
public WriteWithResults<T, V> withRetryStrategy(RetryStrategy retryStrategy) {
1459+
checkArgument(retryStrategy != null, "retryStrategy can not be null");
1460+
return toBuilder().setRetryStrategy(retryStrategy).build();
1461+
}
1462+
1463+
/**
1464+
* When a SQL exception occurs, {@link Write} uses this {@link RetryConfiguration} to
1465+
* exponentially back off and retry the statements based on the {@link RetryConfiguration}
1466+
* mentioned.
1467+
*
1468+
* <p>Usage of RetryConfiguration -
1469+
*
1470+
* <pre>{@code
1471+
* pipeline.apply(JdbcIO.<T>write())
1472+
* .withReturningResults(...)
1473+
* .withDataSourceConfiguration(...)
1474+
* .withRetryStrategy(...)
1475+
* .withRetryConfiguration(JdbcIO.RetryConfiguration.
1476+
* create(5, Duration.standardSeconds(5), Duration.standardSeconds(1))
1477+
*
1478+
* }</pre>
1479+
*
1480+
* maxDuration and initialDuration are Nullable
1481+
*
1482+
* <pre>{@code
1483+
* pipeline.apply(JdbcIO.<T>write())
1484+
* .withReturningResults(...)
1485+
* .withDataSourceConfiguration(...)
1486+
* .withRetryStrategy(...)
1487+
* .withRetryConfiguration(JdbcIO.RetryConfiguration.
1488+
* create(5, null, null)
1489+
*
1490+
* }</pre>
1491+
*/
1492+
public WriteWithResults<T, V> withRetryConfiguration(RetryConfiguration retryConfiguration) {
1493+
checkArgument(retryConfiguration != null, "retryConfiguration can not be null");
1494+
return toBuilder().setRetryConfiguration(retryConfiguration).build();
1495+
}
1496+
1497+
public WriteWithResults<T, V> withTable(String table) {
1498+
checkArgument(table != null, "table name can not be null");
1499+
return toBuilder().setTable(table).build();
1500+
}
1501+
1502+
public WriteWithResults<T, V> withRowMapper(RowMapper<V> rowMapper) {
1503+
checkArgument(rowMapper != null, "result set getter can not be null");
1504+
return toBuilder().setRowMapper(rowMapper).build();
1505+
}
1506+
1507+
@Override
1508+
public PCollection<V> expand(PCollection<T> input) {
1509+
checkArgument(getStatement() != null, "withStatement() is required");
1510+
checkArgument(
1511+
getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required");
1512+
checkArgument(
1513+
(getDataSourceProviderFn() != null),
1514+
"withDataSourceConfiguration() or withDataSourceProviderFn() is required");
1515+
1516+
return input.apply(ParDo.of(new WriteWithResultsFn<>(this)));
1517+
}
1518+
1519+
private static class WriteWithResultsFn<T, V extends JdbcWriteResult> extends DoFn<T, V> {
1520+
1521+
private final WriteWithResults<T, V> spec;
1522+
private DataSource dataSource;
1523+
private Connection connection;
1524+
private PreparedStatement preparedStatement;
1525+
private static FluentBackoff retryBackOff;
1526+
1527+
public WriteWithResultsFn(WriteWithResults<T, V> spec) {
1528+
this.spec = spec;
1529+
}
1530+
1531+
@Setup
1532+
public void setup() {
1533+
dataSource = spec.getDataSourceProviderFn().apply(null);
1534+
RetryConfiguration retryConfiguration = spec.getRetryConfiguration();
1535+
1536+
retryBackOff =
1537+
FluentBackoff.DEFAULT
1538+
.withInitialBackoff(retryConfiguration.getInitialDuration())
1539+
.withMaxCumulativeBackoff(retryConfiguration.getMaxDuration())
1540+
.withMaxRetries(retryConfiguration.getMaxAttempts());
1541+
}
1542+
1543+
@ProcessElement
1544+
public void processElement(ProcessContext context) throws Exception {
1545+
T record = context.element();
1546+
1547+
// Only acquire the connection if there is something to write.
1548+
if (connection == null) {
1549+
connection = dataSource.getConnection();
1550+
connection.setAutoCommit(false);
1551+
preparedStatement = connection.prepareStatement(spec.getStatement().get());
1552+
}
1553+
Sleeper sleeper = Sleeper.DEFAULT;
1554+
BackOff backoff = retryBackOff.backoff();
1555+
while (true) {
1556+
try (PreparedStatement preparedStatement =
1557+
connection.prepareStatement(spec.getStatement().get())) {
1558+
try {
1559+
1560+
try {
1561+
spec.getPreparedStatementSetter().setParameters(record, preparedStatement);
1562+
} catch (Exception e) {
1563+
throw new RuntimeException(e);
1564+
}
1565+
1566+
// execute the statement
1567+
preparedStatement.execute();
1568+
// commit the changes
1569+
connection.commit();
1570+
context.output(spec.getRowMapper().mapRow(preparedStatement.getResultSet()));
1571+
return;
1572+
} catch (SQLException exception) {
1573+
if (!spec.getRetryStrategy().apply(exception)) {
1574+
throw exception;
1575+
}
1576+
LOG.warn("Deadlock detected, retrying", exception);
1577+
connection.rollback();
1578+
if (!BackOffUtils.next(sleeper, backoff)) {
1579+
// we tried the max number of times
1580+
throw exception;
1581+
}
1582+
}
1583+
}
1584+
}
1585+
}
1586+
1587+
@FinishBundle
1588+
public void finishBundle() throws Exception {
1589+
cleanUpStatementAndConnection();
1590+
}
1591+
1592+
@Override
1593+
protected void finalize() throws Throwable {
1594+
cleanUpStatementAndConnection();
1595+
}
1596+
1597+
private void cleanUpStatementAndConnection() throws Exception {
1598+
try {
1599+
if (preparedStatement != null) {
1600+
try {
1601+
preparedStatement.close();
1602+
} finally {
1603+
preparedStatement = null;
1604+
}
1605+
}
1606+
} finally {
1607+
if (connection != null) {
1608+
try {
1609+
connection.close();
1610+
} finally {
1611+
connection = null;
1612+
}
1613+
}
1614+
}
1615+
}
1616+
}
1617+
}
1618+
1619+
/**
1620+
* A {@link PTransform} to write to a JDBC datasource. Executes statements in a batch, and returns
1621+
* a trivial result.
1622+
*/
13681623
@AutoValue
13691624
public abstract static class WriteVoid<T> extends PTransform<PCollection<T>, PCollection<Void>> {
13701625

0 commit comments

Comments
 (0)