Skip to content

Commit f5020e7

Browse files
authored
Merge pull request #24145: Handle updates to table schema when using Storage API writes.
2 parents 428ec97 + 7ad44c8 commit f5020e7

18 files changed

Lines changed: 958 additions & 178 deletions

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java

Lines changed: 99 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,15 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigquery;
1919

20+
import com.google.api.services.bigquery.model.TableRow;
21+
import com.google.auto.value.AutoValue;
22+
import com.google.auto.value.extension.memoized.Memoized;
2023
import com.google.cloud.bigquery.storage.v1.TableSchema;
24+
import com.google.protobuf.ByteString;
2125
import com.google.protobuf.Descriptors;
26+
import com.google.protobuf.DynamicMessage;
27+
import com.google.protobuf.InvalidProtocolBufferException;
28+
import com.google.protobuf.Message;
2229
import java.util.function.Consumer;
2330
import java.util.function.Supplier;
2431
import javax.annotation.Nullable;
@@ -28,35 +35,111 @@
2835
* StorageApiWritesShardedRecords} to enapsulate a destination {@link TableSchema} along with a
2936
* {@link BigQueryServices.StreamAppendClient} and other objects needed to write records.
3037
*/
31-
class AppendClientInfo {
32-
@Nullable BigQueryServices.StreamAppendClient streamAppendClient;
33-
@Nullable TableSchema tableSchema;
34-
Consumer<BigQueryServices.StreamAppendClient> closeAppendClient;
35-
Descriptors.Descriptor descriptor;
38+
@AutoValue
39+
abstract class AppendClientInfo {
40+
abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();
3641

37-
public AppendClientInfo(
42+
abstract TableSchema getTableSchema();
43+
44+
abstract Consumer<BigQueryServices.StreamAppendClient> getCloseAppendClient();
45+
46+
abstract com.google.api.services.bigquery.model.TableSchema getJsonTableSchema();
47+
48+
abstract TableRowToStorageApiProto.SchemaInformation getSchemaInformation();
49+
50+
abstract Descriptors.Descriptor getDescriptor();
51+
52+
@AutoValue.Builder
53+
abstract static class Builder {
54+
abstract Builder setStreamAppendClient(@Nullable BigQueryServices.StreamAppendClient value);
55+
56+
abstract Builder setTableSchema(TableSchema value);
57+
58+
abstract Builder setCloseAppendClient(Consumer<BigQueryServices.StreamAppendClient> value);
59+
60+
abstract Builder setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value);
61+
62+
abstract Builder setSchemaInformation(TableRowToStorageApiProto.SchemaInformation value);
63+
64+
abstract Builder setDescriptor(Descriptors.Descriptor value);
65+
66+
abstract AppendClientInfo build();
67+
};
68+
69+
abstract Builder toBuilder();
70+
71+
static AppendClientInfo of(
3872
TableSchema tableSchema, Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
3973
throws Exception {
40-
this.tableSchema = tableSchema;
41-
this.descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true);
42-
this.closeAppendClient = closeAppendClient;
74+
return new AutoValue_AppendClientInfo.Builder()
75+
.setTableSchema(tableSchema)
76+
.setCloseAppendClient(closeAppendClient)
77+
.setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema))
78+
.setSchemaInformation(
79+
TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema))
80+
.setDescriptor(TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true))
81+
.build();
4382
}
4483

45-
public AppendClientInfo createAppendClient(
84+
public AppendClientInfo withNoAppendClient() {
85+
return toBuilder().setStreamAppendClient(null).build();
86+
}
87+
88+
public AppendClientInfo withAppendClient(
4689
BigQueryServices.DatasetService datasetService,
4790
Supplier<String> getStreamName,
4891
boolean useConnectionPool)
4992
throws Exception {
50-
if (streamAppendClient == null) {
51-
this.streamAppendClient =
52-
datasetService.getStreamAppendClient(getStreamName.get(), descriptor, useConnectionPool);
93+
if (getStreamAppendClient() != null) {
94+
return this;
95+
} else {
96+
return toBuilder()
97+
.setStreamAppendClient(
98+
datasetService.getStreamAppendClient(
99+
getStreamName.get(), getDescriptor(), useConnectionPool))
100+
.build();
53101
}
54-
return this;
55102
}
56103

57104
public void close() {
58-
if (streamAppendClient != null) {
59-
closeAppendClient.accept(streamAppendClient);
105+
BigQueryServices.StreamAppendClient client = getStreamAppendClient();
106+
if (client != null) {
107+
getCloseAppendClient().accept(client);
108+
}
109+
}
110+
111+
boolean hasSchemaChanged(TableSchema updatedTableSchema) {
112+
return updatedTableSchema.hashCode() != getTableSchema().hashCode();
113+
}
114+
115+
public ByteString encodeUnknownFields(TableRow unknown, boolean ignoreUnknownValues)
116+
throws TableRowToStorageApiProto.SchemaConversionException {
117+
Message msg =
118+
TableRowToStorageApiProto.messageFromTableRow(
119+
getSchemaInformation(),
120+
getDescriptorIgnoreRequired(),
121+
unknown,
122+
ignoreUnknownValues,
123+
true,
124+
null);
125+
return msg.toByteString();
126+
}
127+
128+
@Memoized
129+
Descriptors.Descriptor getDescriptorIgnoreRequired() {
130+
try {
131+
return TableRowToStorageApiProto.getDescriptorFromTableSchema(getTableSchema(), false);
132+
} catch (Exception e) {
133+
throw new RuntimeException(e);
134+
}
135+
}
136+
137+
public TableRow toTableRow(ByteString protoBytes) {
138+
try {
139+
return TableRowToStorageApiProto.tableRowFromMessage(
140+
DynamicMessage.parseFrom(getDescriptor(), protoBytes));
141+
} catch (InvalidProtocolBufferException e) {
142+
throw new RuntimeException(e);
60143
}
61144
}
62145
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2878,6 +2878,15 @@ public WriteResult expand(PCollection<T> input) {
28782878
"withAutoSchemaUpdate only supported when using storage-api writes.");
28792879
}
28802880

2881+
if (getAutoSchemaUpdate()) {
2882+
// TODO(reuvenlax): Remove this restriction once we implement support.
2883+
checkArgument(
2884+
getIgnoreUnknownValues(),
2885+
"Auto schema update currently only supported when ignoreUnknownValues also set.");
2886+
checkArgument(
2887+
!getUseBeamSchema(), "Auto schema update not supported when using Beam schemas.");
2888+
}
2889+
28812890
if (method != Write.Method.FILE_LOADS) {
28822891
// we only support writing avro for FILE_LOADS
28832892
checkArgument(
@@ -3172,11 +3181,12 @@ private <DestinationT> WriteResult continueExpandTyped(
31723181
dynamicDestinations,
31733182
tableRowWriterFactory.getToRowFn(),
31743183
getCreateDisposition(),
3175-
getIgnoreUnknownValues());
3184+
getIgnoreUnknownValues(),
3185+
getAutoSchemaUpdate());
31763186
}
31773187

31783188
StorageApiLoads<DestinationT, T> storageApiLoads =
3179-
new StorageApiLoads<DestinationT, T>(
3189+
new StorageApiLoads<>(
31803190
destinationCoder,
31813191
storageApiDynamicDestinations,
31823192
getCreateDisposition(),
@@ -3185,7 +3195,9 @@ private <DestinationT> WriteResult continueExpandTyped(
31853195
getBigQueryServices(),
31863196
getStorageApiNumStreams(bqOptions),
31873197
method == Method.STORAGE_API_AT_LEAST_ONCE,
3188-
getAutoSharding());
3198+
getAutoSharding(),
3199+
getAutoSchemaUpdate(),
3200+
getIgnoreUnknownValues());
31893201
return input.apply("StorageApiLoads", storageApiLoads);
31903202
} else {
31913203
throw new RuntimeException("Unexpected write method " + method);

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import com.google.cloud.bigquery.storage.v1.ReadSession;
4242
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
4343
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
44+
import com.google.cloud.bigquery.storage.v1.TableSchema;
4445
import com.google.cloud.bigquery.storage.v1.WriteStream;
4546
import com.google.protobuf.Descriptors.Descriptor;
4647
import java.io.IOException;
@@ -204,6 +205,9 @@ Table patchTableDescription(TableReference tableReference, @Nullable String tabl
204205
WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
205206
throws IOException, InterruptedException;
206207

208+
@Nullable
209+
WriteStream getWriteStream(String writeStream);
210+
207211
/**
208212
* Create an append client for a given Storage API write stream. The stream must be created
209213
* first.
@@ -230,6 +234,10 @@ interface StreamAppendClient extends AutoCloseable {
230234
/** Append rows to a Storage API write stream at the given offset. */
231235
ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows) throws Exception;
232236

237+
/** If the table schema has been updated, returns the new schema. Otherwise returns null. */
238+
@Nullable
239+
TableSchema getUpdatedSchema();
240+
233241
/**
234242
* If the previous call to appendRows blocked due to flow control, returns how long the call
235243
* blocked for.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
8383
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
8484
import com.google.cloud.bigquery.storage.v1.StreamWriter;
85+
import com.google.cloud.bigquery.storage.v1.TableSchema;
8586
import com.google.cloud.bigquery.storage.v1.WriteStream;
8687
import com.google.cloud.hadoop.util.ApiErrorExtractor;
8788
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
@@ -1309,6 +1310,11 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
13091310
.build());
13101311
}
13111312

1313+
@Override
1314+
public @Nullable WriteStream getWriteStream(String writeStream) {
1315+
return newWriteClient.getWriteStream(writeStream);
1316+
}
1317+
13121318
@Override
13131319
public StreamAppendClient getStreamAppendClient(
13141320
String streamName, Descriptor descriptor, boolean useConnectionPool) throws Exception {
@@ -1378,6 +1384,11 @@ public ApiFuture<AppendRowsResponse> appendRows(long offset, ProtoRows rows)
13781384
return streamWriter.append(rows, offset);
13791385
}
13801386

1387+
@Override
1388+
public TableSchema getUpdatedSchema() {
1389+
return streamWriter.getUpdatedSchema();
1390+
}
1391+
13811392
@Override
13821393
public long getInflightWaitSeconds() {
13831394
return streamWriter.getInflightWaitSeconds();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,9 @@
2828
import com.google.api.services.bigquery.model.TableRow;
2929
import com.google.api.services.bigquery.model.TableSchema;
3030
import com.google.auto.value.AutoValue;
31-
import com.google.protobuf.Descriptors.Descriptor;
32-
import com.google.protobuf.Descriptors.FieldDescriptor;
33-
import com.google.protobuf.Descriptors.FieldDescriptor.Type;
3431
import java.io.Serializable;
3532
import java.math.BigDecimal;
3633
import java.nio.ByteBuffer;
37-
import java.nio.charset.StandardCharsets;
3834
import java.time.LocalDate;
3935
import java.time.LocalDateTime;
4036
import java.time.LocalTime;
@@ -74,7 +70,6 @@
7470
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
7571
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
7672
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
77-
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
7873
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
7974
import org.checkerframework.checker.nullness.qual.Nullable;
8075
import org.joda.time.DateTime;
@@ -1036,28 +1031,4 @@ private static Object convertAvroNumeric(Object value) {
10361031
public static ServiceCallMetric writeCallMetric(TableReference tableReference) {
10371032
return callMetricForMethod(tableReference, "BigQueryBatchWrite");
10381033
}
1039-
1040-
/**
1041-
* Hashes a schema descriptor using a deterministic hash function.
1042-
*
1043-
* <p>Warning! These hashes are encoded into messages, so changing this function will cause
1044-
* pipelines to get stuck on update!
1045-
*/
1046-
public static long hashSchemaDescriptorDeterministic(Descriptor descriptor) {
1047-
long hashCode = 0;
1048-
for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
1049-
hashCode +=
1050-
Hashing.murmur3_32()
1051-
.hashString(fieldDescriptor.getName(), StandardCharsets.UTF_8)
1052-
.asInt();
1053-
hashCode += Hashing.murmur3_32().hashInt(fieldDescriptor.isRepeated() ? 1 : 0).asInt();
1054-
hashCode += Hashing.murmur3_32().hashInt(fieldDescriptor.isRequired() ? 1 : 0).asInt();
1055-
Type type = fieldDescriptor.getType();
1056-
hashCode += Hashing.murmur3_32().hashInt(type.ordinal()).asInt();
1057-
if (type.equals(Type.MESSAGE)) {
1058-
hashCode += hashSchemaDescriptorDeterministic(fieldDescriptor.getMessageType());
1059-
}
1060-
}
1061-
return hashCode;
1062-
}
10631034
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/SplittingIterable.java

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,50 @@
1717
*/
1818
package org.apache.beam.sdk.io.gcp.bigquery;
1919

20+
import com.google.api.services.bigquery.model.TableRow;
2021
import com.google.cloud.bigquery.storage.v1.ProtoRows;
2122
import com.google.protobuf.ByteString;
2223
import java.util.Iterator;
2324
import java.util.NoSuchElementException;
25+
import java.util.function.BiConsumer;
26+
import java.util.function.Function;
27+
import javax.annotation.Nullable;
2428

2529
/**
2630
* Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize
2731
* parameter controls how many rows are batched into a single ProtoRows object before we move on to
2832
* the next one.
2933
*/
3034
class SplittingIterable implements Iterable<ProtoRows> {
35+
interface ConvertUnknownFields {
36+
ByteString convert(TableRow tableRow, boolean ignoreUnknownValues)
37+
throws TableRowToStorageApiProto.SchemaConversionException;
38+
}
39+
3140
private final Iterable<StorageApiWritePayload> underlying;
3241
private final long splitSize;
3342

34-
public SplittingIterable(Iterable<StorageApiWritePayload> underlying, long splitSize) {
43+
private final ConvertUnknownFields unknownFieldsToMessage;
44+
private final Function<ByteString, TableRow> protoToTableRow;
45+
private final BiConsumer<TableRow, String> failedRowsConsumer;
46+
private final boolean autoUpdateSchema;
47+
private final boolean ignoreUnknownValues;
48+
49+
public SplittingIterable(
50+
Iterable<StorageApiWritePayload> underlying,
51+
long splitSize,
52+
ConvertUnknownFields unknownFieldsToMessage,
53+
Function<ByteString, TableRow> protoToTableRow,
54+
BiConsumer<TableRow, String> failedRowsConsumer,
55+
boolean autoUpdateSchema,
56+
boolean ignoreUnknownValues) {
3557
this.underlying = underlying;
3658
this.splitSize = splitSize;
59+
this.unknownFieldsToMessage = unknownFieldsToMessage;
60+
this.protoToTableRow = protoToTableRow;
61+
this.failedRowsConsumer = failedRowsConsumer;
62+
this.autoUpdateSchema = autoUpdateSchema;
63+
this.ignoreUnknownValues = ignoreUnknownValues;
3764
}
3865

3966
@Override
@@ -57,7 +84,37 @@ public ProtoRows next() {
5784
while (underlyingIterator.hasNext()) {
5885
StorageApiWritePayload payload = underlyingIterator.next();
5986
ByteString byteString = ByteString.copyFrom(payload.getPayload());
60-
87+
if (autoUpdateSchema) {
88+
try {
89+
@Nullable TableRow unknownFields = payload.getUnknownFields();
90+
if (unknownFields != null) {
91+
// Protocol buffer serialization format supports concatenation. We serialize any new
92+
// "known" fields
93+
// into a proto and concatenate to the existing proto.
94+
try {
95+
byteString =
96+
byteString.concat(
97+
unknownFieldsToMessage.convert(unknownFields, ignoreUnknownValues));
98+
} catch (TableRowToStorageApiProto.SchemaConversionException e) {
99+
// This generally implies that ignoreUnknownValues=false and there were still
100+
// unknown values here.
101+
// Reconstitute the TableRow and send it to the failed-rows consumer.
102+
TableRow tableRow = protoToTableRow.apply(byteString);
103+
// TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we
104+
// only execute this
105+
// codepath when ignoreUnknownFields==true, so we should never hit this codepath.
106+
// However once
107+
// 24926 is fixed, we need to merge the unknownFields back into the main row
108+
// before outputting to the
109+
// failed-rows consumer.
110+
failedRowsConsumer.accept(tableRow, e.toString());
111+
continue;
112+
}
113+
}
114+
} catch (Exception e) {
115+
throw new RuntimeException(e);
116+
}
117+
}
61118
inserts.addSerializedRows(byteString);
62119
bytesSize += byteString.size();
63120
if (bytesSize > splitSize) {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public interface MessageConverter<T> {
3535

3636
StorageApiWritePayload toMessage(T element) throws Exception;
3737

38+
StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired) throws Exception;
39+
3840
TableRow toTableRow(T element);
3941
}
4042

0 commit comments

Comments
 (0)