3131import com .google .api .services .bigquery .model .TableReference ;
3232import com .google .api .services .bigquery .model .TableRow ;
3333import com .google .api .services .bigquery .model .TableSchema ;
34+ import com .google .api .services .bigquery .model .TimePartitioning ;
3435import com .google .auto .value .AutoValue ;
3536import com .google .common .annotations .VisibleForTesting ;
3637import com .google .common .base .Predicates ;
6061import org .apache .beam .sdk .io .gcp .bigquery .BigQueryHelpers .TableRefToJson ;
6162import org .apache .beam .sdk .io .gcp .bigquery .BigQueryHelpers .TableSchemaToJsonSchema ;
6263import org .apache .beam .sdk .io .gcp .bigquery .BigQueryHelpers .TableSpecToTableRef ;
64+ import org .apache .beam .sdk .io .gcp .bigquery .BigQueryHelpers .TimePartitioningToJson ;
6365import org .apache .beam .sdk .io .gcp .bigquery .BigQueryServices .DatasetService ;
6466import org .apache .beam .sdk .io .gcp .bigquery .BigQueryServices .JobService ;
6567import org .apache .beam .sdk .io .gcp .bigquery .DynamicDestinationsHelpers .ConstantSchemaDestinations ;
68+ import org .apache .beam .sdk .io .gcp .bigquery .DynamicDestinationsHelpers .ConstantTimePartitioningDestinations ;
6669import org .apache .beam .sdk .io .gcp .bigquery .DynamicDestinationsHelpers .SchemaFromViewDestinations ;
6770import org .apache .beam .sdk .io .gcp .bigquery .DynamicDestinationsHelpers .TableFunctionDestinations ;
6871import org .apache .beam .sdk .options .PipelineOptions ;
@@ -824,6 +827,7 @@ public enum Method {
824827 @ Nullable abstract DynamicDestinations <T , ?> getDynamicDestinations ();
825828 @ Nullable abstract PCollectionView <Map <String , String >> getSchemaFromView ();
826829 @ Nullable abstract ValueProvider <String > getJsonSchema ();
830+ @ Nullable abstract ValueProvider <String > getJsonTimePartitioning ();
827831 abstract CreateDisposition getCreateDisposition ();
828832 abstract WriteDisposition getWriteDisposition ();
829833 /** Table description. Default is empty. */
@@ -854,6 +858,7 @@ abstract Builder<T> setTableFunction(
854858 abstract Builder <T > setDynamicDestinations (DynamicDestinations <T , ?> dynamicDestinations );
855859 abstract Builder <T > setSchemaFromView (PCollectionView <Map <String , String >> view );
856860 abstract Builder <T > setJsonSchema (ValueProvider <String > jsonSchema );
861+ abstract Builder <T > setJsonTimePartitioning (ValueProvider <String > jsonTimePartitioning );
857862 abstract Builder <T > setCreateDisposition (CreateDisposition createDisposition );
858863 abstract Builder <T > setWriteDisposition (WriteDisposition writeDisposition );
859864 abstract Builder <T > setTableDescription (String tableDescription );
@@ -1022,6 +1027,33 @@ public Write<T> withSchemaFromView(PCollectionView<Map<String, String>> view) {
10221027 return toBuilder ().setSchemaFromView (view ).build ();
10231028 }
10241029
1030+ /**
1031+ * Allows newly created tables to include a {@link TimePartitioning} class. Can only be used
1032+ * when writing to a single table. If {@link #to(SerializableFunction)} or
1033+ * {@link #to(DynamicDestinations)} is used to write dynamic tables, time partitioning can be
1034+ * directly in the returned {@link TableDestination}.
1035+ */
1036+ public Write <T > withTimePartitioning (TimePartitioning partitioning ) {
1037+ return withJsonTimePartitioning (
1038+ StaticValueProvider .of (BigQueryHelpers .toJsonString (partitioning )));
1039+ }
1040+
1041+ /**
1042+ * Like {@link #withTimePartitioning(TimePartitioning)} but using a deferred
1043+ * {@link ValueProvider}.
1044+ */
1045+ public Write <T > withTimePartitioning (ValueProvider <TimePartitioning > partition ) {
1046+ return withJsonTimePartitioning (NestedValueProvider .of (
1047+ partition , new TimePartitioningToJson ()));
1048+ }
1049+
1050+ /**
1051+ * The same as {@link #withTimePartitioning}, but takes a JSON-serialized object.
1052+ */
1053+ public Write <T > withJsonTimePartitioning (ValueProvider <String > partition ) {
1054+ return toBuilder ().setJsonTimePartitioning (partition ).build ();
1055+ }
1056+
10251057 /** Specifies whether the table should be created if it does not exist. */
10261058 public Write <T > withCreateDisposition (CreateDisposition createDisposition ) {
10271059 return toBuilder ().setCreateDisposition (createDisposition ).build ();
@@ -1183,6 +1215,15 @@ public WriteResult expand(PCollection<T> input) {
11831215 input .isBounded (),
11841216 method );
11851217 }
1218+ if (getJsonTimePartitioning () != null ) {
1219+ checkArgument (getDynamicDestinations () == null ,
1220+ "The supplied DynamicDestinations object can directly set TimePartitioning."
1221+ + " There is no need to call BigQueryIO.Write.withTimePartitioning." );
1222+ checkArgument (getTableFunction () == null ,
1223+ "The supplied getTableFunction object can directly set TimePartitioning."
1224+ + " There is no need to call BigQueryIO.Write.withTimePartitioning." );
1225+ }
1226+
11861227 DynamicDestinations <T , ?> dynamicDestinations = getDynamicDestinations ();
11871228 if (dynamicDestinations == null ) {
11881229 if (getJsonTableRef () != null ) {
@@ -1205,6 +1246,12 @@ public WriteResult expand(PCollection<T> input) {
12051246 (DynamicDestinations <T , TableDestination >) dynamicDestinations ,
12061247 getSchemaFromView ());
12071248 }
1249+
1250+ // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
1251+ if (getJsonTimePartitioning () != null ) {
1252+ dynamicDestinations = new ConstantTimePartitioningDestinations (
1253+ dynamicDestinations , getJsonTimePartitioning ());
1254+ }
12081255 }
12091256 return expandTyped (input , dynamicDestinations );
12101257 }
0 commit comments