2323import com .google .protobuf .Descriptors .Descriptor ;
2424import com .google .protobuf .Descriptors .FieldDescriptor ;
2525import com .google .protobuf .DynamicMessage ;
26+ import com .google .protobuf .Int64Value ;
2627import java .math .BigDecimal ;
2728import java .nio .ByteBuffer ;
2829import java .util .Map ;
@@ -55,6 +56,11 @@ public class AvroGenericRecordToStorageApiProto {
5556
5657 private static final org .joda .time .LocalDate EPOCH_DATE = new org .joda .time .LocalDate (1970 , 1 , 1 );
5758
59+ private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos" ;
60+ private static final long PICOSECOND_PRECISION = 12L ;
61+ private static final long NANOS_PER_SECOND = 1_000_000_000L ;
62+ private static final long PICOS_PER_NANO = 1000L ;
63+
5864 static final Map <Schema .Type , TableFieldSchema .Type > PRIMITIVE_TYPES =
5965 ImmutableMap .<Schema .Type , TableFieldSchema .Type >builder ()
6066 .put (Schema .Type .INT , TableFieldSchema .Type .INT64 )
@@ -314,6 +320,7 @@ public static DynamicMessage messageFromGenericRecord(
314320 @ SuppressWarnings ("nullness" )
315321 private static TableFieldSchema fieldDescriptorFromAvroField (org .apache .avro .Schema .Field field ) {
316322 @ Nullable Schema schema = field .schema ();
323+
317324 Preconditions .checkNotNull (schema , "Unexpected null schema!" );
318325 if (StorageApiCDC .COLUMNS .contains (field .name ())) {
319326 throw new RuntimeException ("Reserved field name " + field .name () + " in user schema." );
@@ -380,34 +387,45 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch
380387 .setType (unionFieldSchema .getType ())
381388 .setMode (unionFieldSchema .getMode ())
382389 .addAllFields (unionFieldSchema .getFieldsList ());
390+
391+ if (unionFieldSchema .hasTimestampPrecision ()) {
392+ builder .setTimestampPrecision (unionFieldSchema .getTimestampPrecision ());
393+ }
383394 break ;
384395 default :
385396 elementType = TypeWithNullability .create (schema ).getType ();
386- Optional <LogicalType > logicalType =
387- Optional .ofNullable (LogicalTypes .fromSchema (elementType ));
388- @ Nullable
389- TableFieldSchema .Type primitiveType =
390- logicalType
391- .flatMap (AvroGenericRecordToStorageApiProto ::logicalTypes )
392- .orElse (PRIMITIVE_TYPES .get (elementType .getType ()));
393- if (primitiveType == null ) {
394- throw new RuntimeException ("Unsupported type " + elementType .getType ());
395- }
396- // a scalar will be required by default, if defined as part of union then
397- // caller will set nullability requirements
398- builder = builder .setType (primitiveType );
399- // parametrized types
400- if (logicalType .isPresent () && logicalType .get ().getName ().equals ("decimal" )) {
401- LogicalTypes .Decimal decimal = (LogicalTypes .Decimal ) logicalType .get ();
402- int precision = decimal .getPrecision ();
403- int scale = decimal .getScale ();
404- if (!(precision == 38 && scale == 9 ) // NUMERIC
405- && !(precision == 77 && scale == 38 ) // BIGNUMERIC
406- ) {
407- // parametrized type
408- builder = builder .setPrecision (precision );
409- if (scale != 0 ) {
410- builder = builder .setScale (scale );
397+ if (TIMESTAMP_NANOS_LOGICAL_TYPE .equals (elementType .getProp ("logicalType" ))) {
398+ builder = builder .setType (TableFieldSchema .Type .TIMESTAMP );
399+ builder .setTimestampPrecision (
400+ Int64Value .newBuilder ().setValue (PICOSECOND_PRECISION ).build ());
401+ break ;
402+ } else {
403+ Optional <LogicalType > logicalType =
404+ Optional .ofNullable (LogicalTypes .fromSchema (elementType ));
405+ @ Nullable
406+ TableFieldSchema .Type primitiveType =
407+ logicalType
408+ .flatMap (AvroGenericRecordToStorageApiProto ::logicalTypes )
409+ .orElse (PRIMITIVE_TYPES .get (elementType .getType ()));
410+ if (primitiveType == null ) {
411+ throw new RuntimeException ("Unsupported type " + elementType .getType ());
412+ }
413+ // a scalar will be required by default, if defined as part of union then
414+ // caller will set nullability requirements
415+ builder = builder .setType (primitiveType );
416+ // parametrized types
417+ if (logicalType .isPresent () && logicalType .get ().getName ().equals ("decimal" )) {
418+ LogicalTypes .Decimal decimal = (LogicalTypes .Decimal ) logicalType .get ();
419+ int precision = decimal .getPrecision ();
420+ int scale = decimal .getScale ();
421+ if (!(precision == 38 && scale == 9 ) // NUMERIC
422+ && !(precision == 77 && scale == 38 ) // BIGNUMERIC
423+ ) {
424+ // parametrized type
425+ builder = builder .setPrecision (precision );
426+ if (scale != 0 ) {
427+ builder = builder .setScale (scale );
428+ }
411429 }
412430 }
413431 }
@@ -476,7 +494,7 @@ private static Object toProtoValue(
476494 mapEntryToProtoValue (fieldDescriptor .getMessageType (), valueType , entry ))
477495 .collect (Collectors .toList ());
478496 default :
479- return scalarToProtoValue (avroSchema , value );
497+ return scalarToProtoValue (fieldDescriptor , avroSchema , value );
480498 }
481499 }
482500
@@ -502,10 +520,42 @@ static Object mapEntryToProtoValue(
502520 return builder .build ();
503521 }
504522
523+ private static DynamicMessage buildTimestampPicosMessage (
524+ Descriptor timestampPicosDescriptor , long seconds , long picoseconds ) {
525+ return DynamicMessage .newBuilder (timestampPicosDescriptor )
526+ .setField (
527+ Preconditions .checkNotNull (timestampPicosDescriptor .findFieldByName ("seconds" )),
528+ seconds )
529+ .setField (
530+ Preconditions .checkNotNull (timestampPicosDescriptor .findFieldByName ("picoseconds" )),
531+ picoseconds )
532+ .build ();
533+ }
534+
505535 @ VisibleForTesting
506- static Object scalarToProtoValue (Schema fieldSchema , Object value ) {
536+ static Object scalarToProtoValue (
537+ @ Nullable FieldDescriptor descriptor , Schema fieldSchema , Object value ) {
507538 TypeWithNullability type = TypeWithNullability .create (fieldSchema );
539+ if (TIMESTAMP_NANOS_LOGICAL_TYPE .equals (type .getType ().getProp ("logicalType" ))) {
540+ Preconditions .checkArgument (
541+ value instanceof Long , "Expecting a value as Long type (timestamp-nanos)." );
542+ long nanos = (Long ) value ;
543+
544+ long seconds = nanos / NANOS_PER_SECOND ;
545+ long nanoAdjustment = nanos % NANOS_PER_SECOND ;
546+
547+ // Handle negative timestamps (before epoch)
548+ if (nanos < 0 && nanoAdjustment != 0 ) {
549+ seconds -= 1 ;
550+ nanoAdjustment += NANOS_PER_SECOND ;
551+ }
552+
553+ long picoseconds = nanoAdjustment * PICOS_PER_NANO ;
554+ return buildTimestampPicosMessage (
555+ Preconditions .checkNotNull (descriptor ).getMessageType (), seconds , picoseconds );
556+ }
508557 LogicalType logicalType = LogicalTypes .fromSchema (type .getType ());
558+
509559 if (logicalType != null ) {
510560 @ Nullable
511561 BiFunction <LogicalType , Object , Object > logicalTypeEncoder =
0 commit comments