Skip to content

Commit 6280d49

Browse files
committed
This closes #3663: [BEAM-2390] Add support for TimePartitioning in BigQueryIO
2 parents 1c26b74 + b0e03a3 commit 6280d49

14 files changed

Lines changed: 278 additions & 18 deletions

File tree

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
<apex.kryo.version>2.24.0</apex.kryo.version>
108108
<api-common.version>1.0.0-rc2</api-common.version>
109109
<avro.version>1.8.2</avro.version>
110-
<bigquery.version>v2-rev295-1.22.0</bigquery.version>
110+
<bigquery.version>v2-rev355-1.22.0</bigquery.version>
111111
<bigtable.version>0.9.7.1</bigtable.version>
112112
<cloudresourcemanager.version>v1-rev6-1.22.0</cloudresourcemanager.version>
113113
<pubsubgrpc.version>0.1.0</pubsubgrpc.version>

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.beam.sdk.coders.VarIntCoder;
4949
import org.apache.beam.sdk.coders.VoidCoder;
5050
import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
51+
import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoderV2;
5152
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
5253

5354
/**
@@ -97,6 +98,7 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
9798
RandomAccessDataCoder.class,
9899
StringUtf8Coder.class,
99100
TableDestinationCoder.class,
101+
TableDestinationCoderV2.class,
100102
TableRowJsonCoder.class,
101103
TextualIntegerCoder.class,
102104
VarIntCoder.class,

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, TableRow>> inpu
266266
.apply(WithKeys.<Void, KV<TableDestination, String>>of((Void) null))
267267
.setCoder(
268268
KvCoder.of(
269-
VoidCoder.of(), KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())))
269+
VoidCoder.of(), KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
270270
.apply(GroupByKey.<Void, KV<TableDestination, String>>create())
271271
.apply(Values.<Iterable<KV<TableDestination, String>>>create())
272272
.apply(
@@ -323,7 +323,7 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, TableRow>> inp
323323

324324
tempTables
325325
.apply("ReifyRenameInput", new ReifyAsIterable<KV<TableDestination, String>>())
326-
.setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())))
326+
.setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
327327
.apply(
328328
"WriteRenameUntriggered",
329329
ParDo.of(

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.api.services.bigquery.model.JobStatus;
2525
import com.google.api.services.bigquery.model.TableReference;
2626
import com.google.api.services.bigquery.model.TableSchema;
27+
import com.google.api.services.bigquery.model.TimePartitioning;
2728
import com.google.cloud.hadoop.util.ApiErrorExtractor;
2829
import com.google.common.annotations.VisibleForTesting;
2930
import com.google.common.hash.Hashing;
@@ -291,6 +292,13 @@ public TableReference apply(String from) {
291292
}
292293
}
293294

295+
static class TimePartitioningToJson implements SerializableFunction<TimePartitioning, String> {
296+
@Override
297+
public String apply(TimePartitioning partitioning) {
298+
return toJsonString(partitioning);
299+
}
300+
}
301+
294302
static String createJobIdToken(String jobName, String stepUuid) {
295303
return String.format("beam_job_%s_%s", stepUuid, jobName.replaceAll("-", ""));
296304
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.google.api.services.bigquery.model.TableReference;
3232
import com.google.api.services.bigquery.model.TableRow;
3333
import com.google.api.services.bigquery.model.TableSchema;
34+
import com.google.api.services.bigquery.model.TimePartitioning;
3435
import com.google.auto.value.AutoValue;
3536
import com.google.common.annotations.VisibleForTesting;
3637
import com.google.common.base.Predicates;
@@ -60,9 +61,11 @@
6061
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
6162
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
6263
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
64+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TimePartitioningToJson;
6365
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
6466
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
6567
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations;
68+
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningDestinations;
6669
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations;
6770
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations;
6871
import org.apache.beam.sdk.options.PipelineOptions;
@@ -824,6 +827,7 @@ public enum Method {
824827
@Nullable abstract DynamicDestinations<T, ?> getDynamicDestinations();
825828
@Nullable abstract PCollectionView<Map<String, String>> getSchemaFromView();
826829
@Nullable abstract ValueProvider<String> getJsonSchema();
830+
@Nullable abstract ValueProvider<String> getJsonTimePartitioning();
827831
abstract CreateDisposition getCreateDisposition();
828832
abstract WriteDisposition getWriteDisposition();
829833
/** Table description. Default is empty. */
@@ -854,6 +858,7 @@ abstract Builder<T> setTableFunction(
854858
abstract Builder<T> setDynamicDestinations(DynamicDestinations<T, ?> dynamicDestinations);
855859
abstract Builder<T> setSchemaFromView(PCollectionView<Map<String, String>> view);
856860
abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema);
861+
abstract Builder<T> setJsonTimePartitioning(ValueProvider<String> jsonTimePartitioning);
857862
abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition);
858863
abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
859864
abstract Builder<T> setTableDescription(String tableDescription);
@@ -1022,6 +1027,33 @@ public Write<T> withSchemaFromView(PCollectionView<Map<String, String>> view) {
10221027
return toBuilder().setSchemaFromView(view).build();
10231028
}
10241029

1030+
/**
1031+
* Allows newly created tables to include a {@link TimePartitioning} class. Can only be used
1032+
* when writing to a single table. If {@link #to(SerializableFunction)} or
1033+
* {@link #to(DynamicDestinations)} is used to write dynamic tables, time partitioning can be
1034+
* directly in the returned {@link TableDestination}.
1035+
*/
1036+
public Write<T> withTimePartitioning(TimePartitioning partitioning) {
1037+
return withJsonTimePartitioning(
1038+
StaticValueProvider.of(BigQueryHelpers.toJsonString(partitioning)));
1039+
}
1040+
1041+
/**
1042+
* Like {@link #withTimePartitioning(TimePartitioning)} but using a deferred
1043+
* {@link ValueProvider}.
1044+
*/
1045+
public Write<T> withTimePartitioning(ValueProvider<TimePartitioning> partition) {
1046+
return withJsonTimePartitioning(NestedValueProvider.of(
1047+
partition, new TimePartitioningToJson()));
1048+
}
1049+
1050+
/**
1051+
* The same as {@link #withTimePartitioning}, but takes a JSON-serialized object.
1052+
*/
1053+
public Write<T> withJsonTimePartitioning(ValueProvider<String> partition) {
1054+
return toBuilder().setJsonTimePartitioning(partition).build();
1055+
}
1056+
10251057
/** Specifies whether the table should be created if it does not exist. */
10261058
public Write<T> withCreateDisposition(CreateDisposition createDisposition) {
10271059
return toBuilder().setCreateDisposition(createDisposition).build();
@@ -1183,6 +1215,15 @@ public WriteResult expand(PCollection<T> input) {
11831215
input.isBounded(),
11841216
method);
11851217
}
1218+
if (getJsonTimePartitioning() != null) {
1219+
checkArgument(getDynamicDestinations() == null,
1220+
"The supplied DynamicDestinations object can directly set TimePartitioning."
1221+
+ " There is no need to call BigQueryIO.Write.withTimePartitioning.");
1222+
checkArgument(getTableFunction() == null,
1223+
"The supplied getTableFunction object can directly set TimePartitioning."
1224+
+ " There is no need to call BigQueryIO.Write.withTimePartitioning.");
1225+
}
1226+
11861227
DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
11871228
if (dynamicDestinations == null) {
11881229
if (getJsonTableRef() != null) {
@@ -1205,6 +1246,12 @@ public WriteResult expand(PCollection<T> input) {
12051246
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
12061247
getSchemaFromView());
12071248
}
1249+
1250+
// Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
1251+
if (getJsonTimePartitioning() != null) {
1252+
dynamicDestinations = new ConstantTimePartitioningDestinations(
1253+
dynamicDestinations, getJsonTimePartitioning());
1254+
}
12081255
}
12091256
return expandTyped(input, dynamicDestinations);
12101257
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ private CreateTables(
7373
}
7474

7575
CreateTables<DestinationT> withTestServices(BigQueryServices bqServices) {
76-
return new CreateTables<DestinationT>(createDisposition, bqServices, dynamicDestinations);
76+
return new CreateTables<>(createDisposition, bqServices, dynamicDestinations);
7777
}
7878

7979
@Override
@@ -124,11 +124,14 @@ private void possibleCreateTable(
124124
DatasetService datasetService = bqServices.getDatasetService(options);
125125
if (!createdTables.contains(tableSpec)) {
126126
if (datasetService.getTable(tableReference) == null) {
127-
datasetService.createTable(
128-
new Table()
129-
.setTableReference(tableReference)
130-
.setSchema(tableSchema)
131-
.setDescription(tableDescription));
127+
Table table = new Table()
128+
.setTableReference(tableReference)
129+
.setSchema(tableSchema)
130+
.setDescription(tableDescription);
131+
if (tableDestination.getTimePartitioning() != null) {
132+
table.setTimePartitioning(tableDestination.getTimePartitioning());
133+
}
134+
datasetService.createTable(table);
132135
}
133136
createdTables.add(tableSpec);
134137
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public TableDestination getTable(TableDestination destination) {
108108

109109
@Override
110110
public Coder<TableDestination> getDestinationCoder() {
111-
return TableDestinationCoder.of();
111+
return TableDestinationCoderV2.of();
112112
}
113113
}
114114

@@ -164,6 +164,31 @@ public TableSchema getSchema(TableDestination destination) {
164164
}
165165
}
166166

167+
static class ConstantTimePartitioningDestinations<T>
168+
extends DelegatingDynamicDestinations<T, TableDestination> {
169+
170+
@Nullable
171+
private final ValueProvider<String> jsonTimePartitioning;
172+
173+
ConstantTimePartitioningDestinations(DynamicDestinations<T, TableDestination> inner,
174+
ValueProvider<String> jsonTimePartitioning) {
175+
super(inner);
176+
this.jsonTimePartitioning = jsonTimePartitioning;
177+
}
178+
179+
@Override
180+
public TableDestination getDestination(ValueInSingleWindow<T> element) {
181+
TableDestination destination = super.getDestination(element);
182+
return new TableDestination(destination.getTableSpec(), destination.getTableDescription(),
183+
jsonTimePartitioning.get());
184+
}
185+
186+
@Override
187+
public Coder<TableDestination> getDestinationCoder() {
188+
return TableDestinationCoderV2.of();
189+
}
190+
}
191+
167192
/**
168193
* Takes in a side input mapping tablespec to json table schema, and always returns the
169194
* matching schema from the side input.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.beam.sdk.io.gcp.bigquery;
2020

2121
import com.google.api.services.bigquery.model.TableReference;
22+
import com.google.api.services.bigquery.model.TimePartitioning;
2223
import java.io.Serializable;
2324
import java.util.Objects;
2425
import javax.annotation.Nullable;
@@ -31,18 +32,38 @@ public class TableDestination implements Serializable {
3132
private final String tableSpec;
3233
@Nullable
3334
private final String tableDescription;
35+
@Nullable
36+
private final String jsonTimePartitioning;
3437

3538

3639
public TableDestination(String tableSpec, @Nullable String tableDescription) {
37-
this.tableSpec = tableSpec;
38-
this.tableDescription = tableDescription;
40+
this(tableSpec, tableDescription, (String) null);
3941
}
4042

4143
public TableDestination(TableReference tableReference, @Nullable String tableDescription) {
42-
this.tableSpec = BigQueryHelpers.toTableSpec(tableReference);
44+
this(tableReference, tableDescription, null);
45+
}
46+
47+
public TableDestination(TableReference tableReference, @Nullable String tableDescription,
48+
TimePartitioning timePartitioning) {
49+
this(BigQueryHelpers.toTableSpec(tableReference), tableDescription,
50+
timePartitioning != null ? BigQueryHelpers.toJsonString(timePartitioning) : null);
51+
}
52+
53+
public TableDestination(String tableSpec, @Nullable String tableDescription,
54+
TimePartitioning timePartitioning) {
55+
this(tableSpec, tableDescription,
56+
timePartitioning != null ? BigQueryHelpers.toJsonString(timePartitioning) : null);
57+
}
58+
59+
public TableDestination(String tableSpec, @Nullable String tableDescription,
60+
@Nullable String jsonTimePartitioning) {
61+
this.tableSpec = tableSpec;
4362
this.tableDescription = tableDescription;
63+
this.jsonTimePartitioning = jsonTimePartitioning;
4464
}
4565

66+
4667
public String getTableSpec() {
4768
return tableSpec;
4869
}
@@ -51,6 +72,18 @@ public TableReference getTableReference() {
5172
return BigQueryHelpers.parseTableSpec(tableSpec);
5273
}
5374

75+
public String getJsonTimePartitioning() {
76+
return jsonTimePartitioning;
77+
}
78+
79+
public TimePartitioning getTimePartitioning() {
80+
if (jsonTimePartitioning == null) {
81+
return null;
82+
} else {
83+
return BigQueryHelpers.fromJsonString(jsonTimePartitioning, TimePartitioning.class);
84+
}
85+
}
86+
5487
@Nullable
5588
public String getTableDescription() {
5689
return tableDescription;

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> {
3333
private static final Coder<String> tableSpecCoder = StringUtf8Coder.of();
3434
private static final Coder<String> tableDescriptionCoder = NullableCoder.of(StringUtf8Coder.of());
3535

36+
private TableDestinationCoder() {}
37+
3638
public static TableDestinationCoder of() {
3739
return INSTANCE;
3840
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.beam.sdk.io.gcp.bigquery;
20+
21+
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.io.OutputStream;
25+
import org.apache.beam.sdk.coders.AtomicCoder;
26+
import org.apache.beam.sdk.coders.Coder;
27+
import org.apache.beam.sdk.coders.NullableCoder;
28+
import org.apache.beam.sdk.coders.StringUtf8Coder;
29+
30+
/**
31+
* A {@link Coder} for {@link TableDestination} that includes time partitioning information. This
32+
* is a new coder (instead of extending the old {@link TableDestinationCoder}) for compatibility
33+
* reasons. The old coder is kept around for the same compatibility reasons.
34+
*/
35+
public class TableDestinationCoderV2 extends AtomicCoder<TableDestination> {
36+
private static final TableDestinationCoderV2 INSTANCE = new TableDestinationCoderV2();
37+
private static final Coder<String> timePartitioningCoder = NullableCoder.of(StringUtf8Coder.of());
38+
39+
public static TableDestinationCoderV2 of() {
40+
return INSTANCE;
41+
}
42+
43+
@Override
44+
public void encode(TableDestination value, OutputStream outStream) throws IOException {
45+
TableDestinationCoder.of().encode(value, outStream);
46+
timePartitioningCoder.encode(value.getJsonTimePartitioning(), outStream);
47+
}
48+
49+
@Override
50+
public TableDestination decode(InputStream inStream) throws IOException {
51+
TableDestination destination = TableDestinationCoder.of().decode(inStream);
52+
String jsonTimePartitioning = timePartitioningCoder.decode(inStream);
53+
return new TableDestination(
54+
destination.getTableSpec(), destination.getTableDescription(), jsonTimePartitioning);
55+
}
56+
57+
@Override
58+
public void verifyDeterministic() throws NonDeterministicException {}
59+
}

0 commit comments

Comments
 (0)