@@ -342,7 +342,7 @@ public static <T> ReadWithPartitions<T> readWithPartitions() {
342342 * @param <T> Type of the data to be written.
343343 */
344344 public static <T > Write <T > write () {
345- return new Write ();
345+ return new Write <> ();
346346 }
347347
348348 public static <T > WriteVoid <T > writeVoid () {
@@ -1283,43 +1283,43 @@ public static class Write<T> extends PTransform<PCollection<T>, PDone> {
12831283
12841284 /** See {@link WriteVoid#withDataSourceConfiguration(DataSourceConfiguration)}. */
12851285 public Write <T > withDataSourceConfiguration (DataSourceConfiguration config ) {
1286- return new Write (inner .withDataSourceConfiguration (config ));
1286+ return new Write <> (inner .withDataSourceConfiguration (config ));
12871287 }
12881288
12891289 /** See {@link WriteVoid#withDataSourceProviderFn(SerializableFunction)}. */
12901290 public Write <T > withDataSourceProviderFn (
12911291 SerializableFunction <Void , DataSource > dataSourceProviderFn ) {
1292- return new Write (inner .withDataSourceProviderFn (dataSourceProviderFn ));
1292+ return new Write <> (inner .withDataSourceProviderFn (dataSourceProviderFn ));
12931293 }
12941294
12951295 /** See {@link WriteVoid#withStatement(String)}. */
12961296 public Write <T > withStatement (String statement ) {
1297- return new Write (inner .withStatement (statement ));
1297+ return new Write <> (inner .withStatement (statement ));
12981298 }
12991299
13001300 /** See {@link WriteVoid#withPreparedStatementSetter(PreparedStatementSetter)}. */
13011301 public Write <T > withPreparedStatementSetter (PreparedStatementSetter <T > setter ) {
1302- return new Write (inner .withPreparedStatementSetter (setter ));
1302+ return new Write <> (inner .withPreparedStatementSetter (setter ));
13031303 }
13041304
13051305 /** See {@link WriteVoid#withBatchSize(long)}. */
13061306 public Write <T > withBatchSize (long batchSize ) {
1307- return new Write (inner .withBatchSize (batchSize ));
1307+ return new Write <> (inner .withBatchSize (batchSize ));
13081308 }
13091309
13101310 /** See {@link WriteVoid#withRetryStrategy(RetryStrategy)}. */
13111311 public Write <T > withRetryStrategy (RetryStrategy retryStrategy ) {
1312- return new Write (inner .withRetryStrategy (retryStrategy ));
1312+ return new Write <> (inner .withRetryStrategy (retryStrategy ));
13131313 }
13141314
13151315 /** See {@link WriteVoid#withRetryConfiguration(RetryConfiguration)}. */
13161316 public Write <T > withRetryConfiguration (RetryConfiguration retryConfiguration ) {
1317- return new Write (inner .withRetryConfiguration (retryConfiguration ));
1317+ return new Write <> (inner .withRetryConfiguration (retryConfiguration ));
13181318 }
13191319
13201320 /** See {@link WriteVoid#withTable(String)}. */
13211321 public Write <T > withTable (String table ) {
1322- return new Write (inner .withTable (table ));
1322+ return new Write <> (inner .withTable (table ));
13231323 }
13241324
13251325 /**
@@ -1341,6 +1341,24 @@ public WriteVoid<T> withResults() {
13411341 return inner ;
13421342 }
13431343
1344+ /**
1345+ * Returns {@link WriteWithResults} transform that could return a specific result.
1346+ *
1347+ * <p>See {@link WriteWithResults}
1348+ */
1349+ public <V extends JdbcWriteResult > WriteWithResults <T , V > withWriteResults (
1350+ RowMapper <V > rowMapper ) {
1351+ return new AutoValue_JdbcIO_WriteWithResults .Builder <T , V >()
1352+ .setRowMapper (rowMapper )
1353+ .setRetryStrategy (inner .getRetryStrategy ())
1354+ .setRetryConfiguration (inner .getRetryConfiguration ())
1355+ .setDataSourceProviderFn (inner .getDataSourceProviderFn ())
1356+ .setPreparedStatementSetter (inner .getPreparedStatementSetter ())
1357+ .setStatement (inner .getStatement ())
1358+ .setTable (inner .getTable ())
1359+ .build ();
1360+ }
1361+
13441362 @ Override
13451363 public void populateDisplayData (DisplayData .Builder builder ) {
13461364 inner .populateDisplayData (builder );
@@ -1364,7 +1382,244 @@ void set(
13641382 throws SQLException ;
13651383 }
13661384
1367- /** A {@link PTransform} to write to a JDBC datasource. */
1385+ /**
1386+ * A {@link PTransform} to write to a JDBC datasource. Executes statements one by one.
1387+ *
1388+ * <p>The INSERT, UPDATE, and DELETE commands sometimes have an optional RETURNING clause that
1389+ * supports obtaining data from modified rows while they are being manipulated. Output {@link
1390+ * PCollection} of this transform is a collection of such returning results mapped by {@link
1391+ * RowMapper}.
1392+ */
1393+ @ AutoValue
1394+ public abstract static class WriteWithResults <T , V extends JdbcWriteResult >
1395+ extends PTransform <PCollection <T >, PCollection <V >> {
1396+ abstract @ Nullable SerializableFunction <Void , DataSource > getDataSourceProviderFn ();
1397+
1398+ abstract @ Nullable ValueProvider <String > getStatement ();
1399+
1400+ abstract @ Nullable PreparedStatementSetter <T > getPreparedStatementSetter ();
1401+
1402+ abstract @ Nullable RetryStrategy getRetryStrategy ();
1403+
1404+ abstract @ Nullable RetryConfiguration getRetryConfiguration ();
1405+
1406+ abstract @ Nullable String getTable ();
1407+
1408+ abstract @ Nullable RowMapper <V > getRowMapper ();
1409+
1410+ abstract Builder <T , V > toBuilder ();
1411+
1412+ @ AutoValue .Builder
1413+ abstract static class Builder <T , V extends JdbcWriteResult > {
1414+ abstract Builder <T , V > setDataSourceProviderFn (
1415+ SerializableFunction <Void , DataSource > dataSourceProviderFn );
1416+
1417+ abstract Builder <T , V > setStatement (ValueProvider <String > statement );
1418+
1419+ abstract Builder <T , V > setPreparedStatementSetter (PreparedStatementSetter <T > setter );
1420+
1421+ abstract Builder <T , V > setRetryStrategy (RetryStrategy deadlockPredicate );
1422+
1423+ abstract Builder <T , V > setRetryConfiguration (RetryConfiguration retryConfiguration );
1424+
1425+ abstract Builder <T , V > setTable (String table );
1426+
1427+ abstract Builder <T , V > setRowMapper (RowMapper <V > rowMapper );
1428+
1429+ abstract WriteWithResults <T , V > build ();
1430+ }
1431+
1432+ public WriteWithResults <T , V > withDataSourceConfiguration (DataSourceConfiguration config ) {
1433+ return withDataSourceProviderFn (new DataSourceProviderFromDataSourceConfiguration (config ));
1434+ }
1435+
1436+ public WriteWithResults <T , V > withDataSourceProviderFn (
1437+ SerializableFunction <Void , DataSource > dataSourceProviderFn ) {
1438+ return toBuilder ().setDataSourceProviderFn (dataSourceProviderFn ).build ();
1439+ }
1440+
1441+ public WriteWithResults <T , V > withStatement (String statement ) {
1442+ return withStatement (ValueProvider .StaticValueProvider .of (statement ));
1443+ }
1444+
1445+ public WriteWithResults <T , V > withStatement (ValueProvider <String > statement ) {
1446+ return toBuilder ().setStatement (statement ).build ();
1447+ }
1448+
1449+ public WriteWithResults <T , V > withPreparedStatementSetter (PreparedStatementSetter <T > setter ) {
1450+ return toBuilder ().setPreparedStatementSetter (setter ).build ();
1451+ }
1452+
1453+ /**
1454+ * When a SQL exception occurs, {@link Write} uses this {@link RetryStrategy} to determine if it
1455+ * will retry the statements. If {@link RetryStrategy#apply(SQLException)} returns {@code true},
1456+ * then {@link Write} retries the statements.
1457+ */
1458+ public WriteWithResults <T , V > withRetryStrategy (RetryStrategy retryStrategy ) {
1459+ checkArgument (retryStrategy != null , "retryStrategy can not be null" );
1460+ return toBuilder ().setRetryStrategy (retryStrategy ).build ();
1461+ }
1462+
1463+ /**
1464+ * When a SQL exception occurs, {@link Write} uses this {@link RetryConfiguration} to
1465+ * exponentially back off and retry the statements based on the {@link RetryConfiguration}
1466+ * mentioned.
1467+ *
1468+ * <p>Usage of RetryConfiguration -
1469+ *
1470+ * <pre>{@code
1471+ * pipeline.apply(JdbcIO.<T>write())
1472+ * .withReturningResults(...)
1473+ * .withDataSourceConfiguration(...)
1474+ * .withRetryStrategy(...)
1475+ * .withRetryConfiguration(JdbcIO.RetryConfiguration.
1476+ * create(5, Duration.standardSeconds(5), Duration.standardSeconds(1))
1477+ *
1478+ * }</pre>
1479+ *
1480+ * maxDuration and initialDuration are Nullable
1481+ *
1482+ * <pre>{@code
1483+ * pipeline.apply(JdbcIO.<T>write())
1484+ * .withReturningResults(...)
1485+ * .withDataSourceConfiguration(...)
1486+ * .withRetryStrategy(...)
1487+ * .withRetryConfiguration(JdbcIO.RetryConfiguration.
1488+ * create(5, null, null)
1489+ *
1490+ * }</pre>
1491+ */
1492+ public WriteWithResults <T , V > withRetryConfiguration (RetryConfiguration retryConfiguration ) {
1493+ checkArgument (retryConfiguration != null , "retryConfiguration can not be null" );
1494+ return toBuilder ().setRetryConfiguration (retryConfiguration ).build ();
1495+ }
1496+
1497+ public WriteWithResults <T , V > withTable (String table ) {
1498+ checkArgument (table != null , "table name can not be null" );
1499+ return toBuilder ().setTable (table ).build ();
1500+ }
1501+
1502+ public WriteWithResults <T , V > withRowMapper (RowMapper <V > rowMapper ) {
1503+ checkArgument (rowMapper != null , "result set getter can not be null" );
1504+ return toBuilder ().setRowMapper (rowMapper ).build ();
1505+ }
1506+
1507+ @ Override
1508+ public PCollection <V > expand (PCollection <T > input ) {
1509+ checkArgument (getStatement () != null , "withStatement() is required" );
1510+ checkArgument (
1511+ getPreparedStatementSetter () != null , "withPreparedStatementSetter() is required" );
1512+ checkArgument (
1513+ (getDataSourceProviderFn () != null ),
1514+ "withDataSourceConfiguration() or withDataSourceProviderFn() is required" );
1515+
1516+ return input .apply (ParDo .of (new WriteWithResultsFn <>(this )));
1517+ }
1518+
1519+ private static class WriteWithResultsFn <T , V extends JdbcWriteResult > extends DoFn <T , V > {
1520+
1521+ private final WriteWithResults <T , V > spec ;
1522+ private DataSource dataSource ;
1523+ private Connection connection ;
1524+ private PreparedStatement preparedStatement ;
1525+ private static FluentBackoff retryBackOff ;
1526+
1527+ public WriteWithResultsFn (WriteWithResults <T , V > spec ) {
1528+ this .spec = spec ;
1529+ }
1530+
1531+ @ Setup
1532+ public void setup () {
1533+ dataSource = spec .getDataSourceProviderFn ().apply (null );
1534+ RetryConfiguration retryConfiguration = spec .getRetryConfiguration ();
1535+
1536+ retryBackOff =
1537+ FluentBackoff .DEFAULT
1538+ .withInitialBackoff (retryConfiguration .getInitialDuration ())
1539+ .withMaxCumulativeBackoff (retryConfiguration .getMaxDuration ())
1540+ .withMaxRetries (retryConfiguration .getMaxAttempts ());
1541+ }
1542+
1543+ @ ProcessElement
1544+ public void processElement (ProcessContext context ) throws Exception {
1545+ T record = context .element ();
1546+
1547+ // Only acquire the connection if there is something to write.
1548+ if (connection == null ) {
1549+ connection = dataSource .getConnection ();
1550+ connection .setAutoCommit (false );
1551+ preparedStatement = connection .prepareStatement (spec .getStatement ().get ());
1552+ }
1553+ Sleeper sleeper = Sleeper .DEFAULT ;
1554+ BackOff backoff = retryBackOff .backoff ();
1555+ while (true ) {
1556+ try (PreparedStatement preparedStatement =
1557+ connection .prepareStatement (spec .getStatement ().get ())) {
1558+ try {
1559+
1560+ try {
1561+ spec .getPreparedStatementSetter ().setParameters (record , preparedStatement );
1562+ } catch (Exception e ) {
1563+ throw new RuntimeException (e );
1564+ }
1565+
1566+ // execute the statement
1567+ preparedStatement .execute ();
1568+ // commit the changes
1569+ connection .commit ();
1570+ context .output (spec .getRowMapper ().mapRow (preparedStatement .getResultSet ()));
1571+ return ;
1572+ } catch (SQLException exception ) {
1573+ if (!spec .getRetryStrategy ().apply (exception )) {
1574+ throw exception ;
1575+ }
1576+ LOG .warn ("Deadlock detected, retrying" , exception );
1577+ connection .rollback ();
1578+ if (!BackOffUtils .next (sleeper , backoff )) {
1579+ // we tried the max number of times
1580+ throw exception ;
1581+ }
1582+ }
1583+ }
1584+ }
1585+ }
1586+
1587+ @ FinishBundle
1588+ public void finishBundle () throws Exception {
1589+ cleanUpStatementAndConnection ();
1590+ }
1591+
1592+ @ Override
1593+ protected void finalize () throws Throwable {
1594+ cleanUpStatementAndConnection ();
1595+ }
1596+
1597+ private void cleanUpStatementAndConnection () throws Exception {
1598+ try {
1599+ if (preparedStatement != null ) {
1600+ try {
1601+ preparedStatement .close ();
1602+ } finally {
1603+ preparedStatement = null ;
1604+ }
1605+ }
1606+ } finally {
1607+ if (connection != null ) {
1608+ try {
1609+ connection .close ();
1610+ } finally {
1611+ connection = null ;
1612+ }
1613+ }
1614+ }
1615+ }
1616+ }
1617+ }
1618+
1619+ /**
1620+ * A {@link PTransform} to write to a JDBC datasource. Executes statements in a batch, and returns
1621+ * a trivial result.
1622+ */
13681623 @ AutoValue
13691624 public abstract static class WriteVoid <T > extends PTransform <PCollection <T >, PCollection <Void >> {
13701625
0 commit comments