Skip to content

Commit 7e83059

Browse files
authored
Remove TableSchema to JSON conversion. (#28274)
* Rethrow error converting TableSchema to JSON * Remove need to parse TableSchema to/from JSON * Remove GenericDatumTransformer's JSON string param * Remove unused TableSchemaFunction
1 parent de10fbd commit 7e83059

File tree

2 files changed

+15
-43
lines changed

2 files changed

+15
-43
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
2121
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
22+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2223
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2324
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2425

@@ -49,7 +50,6 @@
4950
import com.google.protobuf.DynamicMessage;
5051
import com.google.protobuf.Message;
5152
import java.io.IOException;
52-
import java.io.Serializable;
5353
import java.lang.reflect.InvocationTargetException;
5454
import java.util.Collections;
5555
import java.util.List;
@@ -132,13 +132,10 @@
132132
import org.apache.beam.sdk.values.TypeDescriptors;
133133
import org.apache.beam.sdk.values.ValueInSingleWindow;
134134
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
135-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
136135
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
137136
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
138137
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
139138
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
140-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
141-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
142139
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
143140
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
144141
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
@@ -649,29 +646,19 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
649646
BigQueryUtils.tableRowFromBeamRow());
650647
}
651648

652-
private static class TableSchemaFunction
653-
implements Serializable, Function<@Nullable String, @Nullable TableSchema> {
654-
@Override
655-
public @Nullable TableSchema apply(@Nullable String input) {
656-
return BigQueryHelpers.fromJsonString(input, TableSchema.class);
657-
}
658-
}
659-
660649
@VisibleForTesting
661650
static class GenericDatumTransformer<T> implements DatumReader<T> {
662651
private final SerializableFunction<SchemaAndRecord, T> parseFn;
663-
private final Supplier<TableSchema> tableSchema;
652+
private final TableSchema tableSchema;
664653
private GenericDatumReader<T> reader;
665654
private org.apache.avro.Schema writerSchema;
666655

667656
public GenericDatumTransformer(
668657
SerializableFunction<SchemaAndRecord, T> parseFn,
669-
String tableSchema,
658+
TableSchema tableSchema,
670659
org.apache.avro.Schema writer) {
671660
this.parseFn = parseFn;
672-
this.tableSchema =
673-
Suppliers.memoize(
674-
Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema)));
661+
this.tableSchema = tableSchema;
675662
this.writerSchema = writer;
676663
this.reader = new GenericDatumReader<>(this.writerSchema);
677664
}
@@ -689,7 +676,7 @@ public void setSchema(org.apache.avro.Schema schema) {
689676
@Override
690677
public T read(T reuse, Decoder in) throws IOException {
691678
GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
692-
return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get()));
679+
return parseFn.apply(new SchemaAndRecord(record, this.tableSchema));
693680
}
694681
}
695682

@@ -721,16 +708,9 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
721708
.setDatumReaderFactory(
722709
(SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>)
723710
input -> {
724-
try {
725-
String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input);
726-
return (AvroSource.DatumReaderFactory<T>)
727-
(writer, reader) ->
728-
new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer);
729-
} catch (IOException e) {
730-
LOG.warn(
731-
String.format("Error while converting table schema %s to JSON!", input), e);
732-
return null;
733-
}
711+
TableSchema safeInput = checkStateNotNull(input);
712+
return (AvroSource.DatumReaderFactory<T>)
713+
(writer, reader) -> new GenericDatumTransformer<>(parseFn, safeInput, writer);
734714
})
735715
// TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed.
736716
.setParseFn(parseFn)
@@ -3386,9 +3366,7 @@ private <DestinationT> WriteResult expandTyped(
33863366
@SuppressWarnings({"unchecked", "nullness"})
33873367
Descriptors.Descriptor descriptor =
33883368
(Descriptors.Descriptor)
3389-
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
3390-
writeProtoClass.getMethod("getDescriptor"))
3391-
.invoke(null);
3369+
checkStateNotNull(writeProtoClass.getMethod("getDescriptor")).invoke(null);
33923370
TableSchema tableSchema =
33933371
TableRowToStorageApiProto.protoSchemaToTableSchema(
33943372
TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor));

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
2121
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
22+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2223
import static org.hamcrest.MatcherAssert.assertThat;
2324
import static org.hamcrest.Matchers.containsInAnyOrder;
2425
import static org.junit.Assert.assertEquals;
@@ -143,18 +144,11 @@ public void evaluate() throws Throwable {
143144

144145
private SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>
145146
datumReaderFactoryFn =
146-
(SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>)
147-
input -> {
148-
try {
149-
String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input);
150-
return (AvroSource.DatumReaderFactory<TableRow>)
151-
(writer, reader) ->
152-
new BigQueryIO.GenericDatumTransformer<>(
153-
BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer);
154-
} catch (IOException e) {
155-
return null;
156-
}
157-
};
147+
input ->
148+
(AvroSource.DatumReaderFactory<TableRow>)
149+
(writer, reader) ->
150+
new BigQueryIO.GenericDatumTransformer<>(
151+
BigQueryIO.TableRowParser.INSTANCE, checkStateNotNull(input), writer);
158152

159153
private static class MyData implements Serializable {
160154
private String name;

0 commit comments

Comments
 (0)