1919
2020import static org .apache .beam .sdk .io .gcp .bigquery .BigQueryHelpers .resolveTempLocation ;
2121import static org .apache .beam .sdk .io .gcp .bigquery .BigQueryResourceNaming .createTempTableReference ;
22+ import static org .apache .beam .sdk .util .Preconditions .checkStateNotNull ;
2223import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkArgument ;
2324import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkState ;
2425
4950import com .google .protobuf .DynamicMessage ;
5051import com .google .protobuf .Message ;
5152import java .io .IOException ;
52- import java .io .Serializable ;
5353import java .lang .reflect .InvocationTargetException ;
5454import java .util .Collections ;
5555import java .util .List ;
132132import org .apache .beam .sdk .values .TypeDescriptors ;
133133import org .apache .beam .sdk .values .ValueInSingleWindow ;
134134import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .annotations .VisibleForTesting ;
135- import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Function ;
136135import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .MoreObjects ;
137136import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions ;
138137import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Predicates ;
139138import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Strings ;
140- import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Supplier ;
141- import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Suppliers ;
142139import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableList ;
143140import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Iterables ;
144141import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Lists ;
@@ -649,29 +646,19 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
649646 BigQueryUtils .tableRowFromBeamRow ());
650647 }
651648
652- private static class TableSchemaFunction
653- implements Serializable , Function <@ Nullable String , @ Nullable TableSchema > {
654- @ Override
655- public @ Nullable TableSchema apply (@ Nullable String input ) {
656- return BigQueryHelpers .fromJsonString (input , TableSchema .class );
657- }
658- }
659-
660649 @ VisibleForTesting
661650 static class GenericDatumTransformer <T > implements DatumReader <T > {
662651 private final SerializableFunction <SchemaAndRecord , T > parseFn ;
663- private final Supplier < TableSchema > tableSchema ;
652+ private final TableSchema tableSchema ;
664653 private GenericDatumReader <T > reader ;
665654 private org .apache .avro .Schema writerSchema ;
666655
667656 public GenericDatumTransformer (
668657 SerializableFunction <SchemaAndRecord , T > parseFn ,
669- String tableSchema ,
658+ TableSchema tableSchema ,
670659 org .apache .avro .Schema writer ) {
671660 this .parseFn = parseFn ;
672- this .tableSchema =
673- Suppliers .memoize (
674- Suppliers .compose (new TableSchemaFunction (), Suppliers .ofInstance (tableSchema )));
661+ this .tableSchema = tableSchema ;
675662 this .writerSchema = writer ;
676663 this .reader = new GenericDatumReader <>(this .writerSchema );
677664 }
@@ -689,7 +676,7 @@ public void setSchema(org.apache.avro.Schema schema) {
689676 @ Override
690677 public T read (T reuse , Decoder in ) throws IOException {
691678 GenericRecord record = (GenericRecord ) this .reader .read (reuse , in );
692- return parseFn .apply (new SchemaAndRecord (record , this .tableSchema . get () ));
679+ return parseFn .apply (new SchemaAndRecord (record , this .tableSchema ));
693680 }
694681 }
695682
@@ -721,16 +708,9 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
721708 .setDatumReaderFactory (
722709 (SerializableFunction <TableSchema , AvroSource .DatumReaderFactory <T >>)
723710 input -> {
724- try {
725- String jsonTableSchema = BigQueryIO .JSON_FACTORY .toString (input );
726- return (AvroSource .DatumReaderFactory <T >)
727- (writer , reader ) ->
728- new GenericDatumTransformer <>(parseFn , jsonTableSchema , writer );
729- } catch (IOException e ) {
730- LOG .warn (
731- String .format ("Error while converting table schema %s to JSON!" , input ), e );
732- return null ;
733- }
711+ TableSchema safeInput = checkStateNotNull (input );
712+ return (AvroSource .DatumReaderFactory <T >)
713+ (writer , reader ) -> new GenericDatumTransformer <>(parseFn , safeInput , writer );
734714 })
735715 // TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed.
736716 .setParseFn (parseFn )
@@ -3386,9 +3366,7 @@ private <DestinationT> WriteResult expandTyped(
33863366 @ SuppressWarnings ({"unchecked" , "nullness" })
33873367 Descriptors .Descriptor descriptor =
33883368 (Descriptors .Descriptor )
3389- org .apache .beam .sdk .util .Preconditions .checkStateNotNull (
3390- writeProtoClass .getMethod ("getDescriptor" ))
3391- .invoke (null );
3369+ checkStateNotNull (writeProtoClass .getMethod ("getDescriptor" )).invoke (null );
33923370 TableSchema tableSchema =
33933371 TableRowToStorageApiProto .protoSchemaToTableSchema (
33943372 TableRowToStorageApiProto .tableSchemaFromDescriptor (descriptor ));
0 commit comments