Skip to content

Commit 4777bef

Browse files
claudevdmClaude
andauthored
Add nanosecond timestamp support for Storage write when writing GenericRecords and Beam Rows (#37235)
* Beam row. * Avro * fixes. * tests. * fix tests. * Update tests. * comments. * fix test. --------- Co-authored-by: Claude <cvandermerwe@google.com>
1 parent 9d2abcf commit 4777bef

9 files changed

Lines changed: 462 additions & 50 deletions

File tree

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java

Lines changed: 77 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.protobuf.Descriptors.Descriptor;
2424
import com.google.protobuf.Descriptors.FieldDescriptor;
2525
import com.google.protobuf.DynamicMessage;
26+
import com.google.protobuf.Int64Value;
2627
import java.math.BigDecimal;
2728
import java.nio.ByteBuffer;
2829
import java.util.Map;
@@ -55,6 +56,11 @@ public class AvroGenericRecordToStorageApiProto {
5556

5657
private static final org.joda.time.LocalDate EPOCH_DATE = new org.joda.time.LocalDate(1970, 1, 1);
5758

59+
private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";
60+
private static final long PICOSECOND_PRECISION = 12L;
61+
private static final long NANOS_PER_SECOND = 1_000_000_000L;
62+
private static final long PICOS_PER_NANO = 1000L;
63+
5864
static final Map<Schema.Type, TableFieldSchema.Type> PRIMITIVE_TYPES =
5965
ImmutableMap.<Schema.Type, TableFieldSchema.Type>builder()
6066
.put(Schema.Type.INT, TableFieldSchema.Type.INT64)
@@ -314,6 +320,7 @@ public static DynamicMessage messageFromGenericRecord(
314320
@SuppressWarnings("nullness")
315321
private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) {
316322
@Nullable Schema schema = field.schema();
323+
317324
Preconditions.checkNotNull(schema, "Unexpected null schema!");
318325
if (StorageApiCDC.COLUMNS.contains(field.name())) {
319326
throw new RuntimeException("Reserved field name " + field.name() + " in user schema.");
@@ -380,34 +387,45 @@ private static TableFieldSchema fieldDescriptorFromAvroField(org.apache.avro.Sch
380387
.setType(unionFieldSchema.getType())
381388
.setMode(unionFieldSchema.getMode())
382389
.addAllFields(unionFieldSchema.getFieldsList());
390+
391+
if (unionFieldSchema.hasTimestampPrecision()) {
392+
builder.setTimestampPrecision(unionFieldSchema.getTimestampPrecision());
393+
}
383394
break;
384395
default:
385396
elementType = TypeWithNullability.create(schema).getType();
386-
Optional<LogicalType> logicalType =
387-
Optional.ofNullable(LogicalTypes.fromSchema(elementType));
388-
@Nullable
389-
TableFieldSchema.Type primitiveType =
390-
logicalType
391-
.flatMap(AvroGenericRecordToStorageApiProto::logicalTypes)
392-
.orElse(PRIMITIVE_TYPES.get(elementType.getType()));
393-
if (primitiveType == null) {
394-
throw new RuntimeException("Unsupported type " + elementType.getType());
395-
}
396-
// a scalar will be required by default, if defined as part of union then
397-
// caller will set nullability requirements
398-
builder = builder.setType(primitiveType);
399-
// parametrized types
400-
if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) {
401-
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get();
402-
int precision = decimal.getPrecision();
403-
int scale = decimal.getScale();
404-
if (!(precision == 38 && scale == 9) // NUMERIC
405-
&& !(precision == 77 && scale == 38) // BIGNUMERIC
406-
) {
407-
// parametrized type
408-
builder = builder.setPrecision(precision);
409-
if (scale != 0) {
410-
builder = builder.setScale(scale);
397+
if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(elementType.getProp("logicalType"))) {
398+
builder = builder.setType(TableFieldSchema.Type.TIMESTAMP);
399+
builder.setTimestampPrecision(
400+
Int64Value.newBuilder().setValue(PICOSECOND_PRECISION).build());
401+
break;
402+
} else {
403+
Optional<LogicalType> logicalType =
404+
Optional.ofNullable(LogicalTypes.fromSchema(elementType));
405+
@Nullable
406+
TableFieldSchema.Type primitiveType =
407+
logicalType
408+
.flatMap(AvroGenericRecordToStorageApiProto::logicalTypes)
409+
.orElse(PRIMITIVE_TYPES.get(elementType.getType()));
410+
if (primitiveType == null) {
411+
throw new RuntimeException("Unsupported type " + elementType.getType());
412+
}
413+
// a scalar will be required by default, if defined as part of union then
414+
// caller will set nullability requirements
415+
builder = builder.setType(primitiveType);
416+
// parametrized types
417+
if (logicalType.isPresent() && logicalType.get().getName().equals("decimal")) {
418+
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType.get();
419+
int precision = decimal.getPrecision();
420+
int scale = decimal.getScale();
421+
if (!(precision == 38 && scale == 9) // NUMERIC
422+
&& !(precision == 77 && scale == 38) // BIGNUMERIC
423+
) {
424+
// parametrized type
425+
builder = builder.setPrecision(precision);
426+
if (scale != 0) {
427+
builder = builder.setScale(scale);
428+
}
411429
}
412430
}
413431
}
@@ -476,7 +494,7 @@ private static Object toProtoValue(
476494
mapEntryToProtoValue(fieldDescriptor.getMessageType(), valueType, entry))
477495
.collect(Collectors.toList());
478496
default:
479-
return scalarToProtoValue(avroSchema, value);
497+
return scalarToProtoValue(fieldDescriptor, avroSchema, value);
480498
}
481499
}
482500

@@ -502,10 +520,42 @@ static Object mapEntryToProtoValue(
502520
return builder.build();
503521
}
504522

523+
private static DynamicMessage buildTimestampPicosMessage(
524+
Descriptor timestampPicosDescriptor, long seconds, long picoseconds) {
525+
return DynamicMessage.newBuilder(timestampPicosDescriptor)
526+
.setField(
527+
Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("seconds")),
528+
seconds)
529+
.setField(
530+
Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("picoseconds")),
531+
picoseconds)
532+
.build();
533+
}
534+
505535
@VisibleForTesting
506-
static Object scalarToProtoValue(Schema fieldSchema, Object value) {
536+
static Object scalarToProtoValue(
537+
@Nullable FieldDescriptor descriptor, Schema fieldSchema, Object value) {
507538
TypeWithNullability type = TypeWithNullability.create(fieldSchema);
539+
if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getType().getProp("logicalType"))) {
540+
Preconditions.checkArgument(
541+
value instanceof Long, "Expecting a value as Long type (timestamp-nanos).");
542+
long nanos = (Long) value;
543+
544+
long seconds = nanos / NANOS_PER_SECOND;
545+
long nanoAdjustment = nanos % NANOS_PER_SECOND;
546+
547+
// Handle negative timestamps (before epoch)
548+
if (nanos < 0 && nanoAdjustment != 0) {
549+
seconds -= 1;
550+
nanoAdjustment += NANOS_PER_SECOND;
551+
}
552+
553+
long picoseconds = nanoAdjustment * PICOS_PER_NANO;
554+
return buildTimestampPicosMessage(
555+
Preconditions.checkNotNull(descriptor).getMessageType(), seconds, picoseconds);
556+
}
508557
LogicalType logicalType = LogicalTypes.fromSchema(type.getType());
558+
509559
if (logicalType != null) {
510560
@Nullable
511561
BiFunction<LogicalType, Object, Object> logicalTypeEncoder =

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.protobuf.Descriptors.Descriptor;
2424
import com.google.protobuf.Descriptors.FieldDescriptor;
2525
import com.google.protobuf.DynamicMessage;
26+
import com.google.protobuf.Int64Value;
2627
import java.math.BigDecimal;
2728
import java.nio.ByteBuffer;
2829
import java.time.Instant;
@@ -44,6 +45,7 @@
4445
import org.apache.beam.sdk.schemas.Schema.TypeName;
4546
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
4647
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
48+
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
4749
import org.apache.beam.sdk.values.Row;
4850
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
4951
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Functions;
@@ -243,9 +245,24 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) {
243245
if (logicalType == null) {
244246
throw new RuntimeException("Unexpected null logical type " + field.getType());
245247
}
246-
@Nullable TableFieldSchema.Type type = LOGICAL_TYPES.get(logicalType.getIdentifier());
247-
if (type == null) {
248-
throw new RuntimeException("Unsupported logical type " + field.getType());
248+
@Nullable TableFieldSchema.Type type;
249+
if (logicalType.getIdentifier().equals(Timestamp.IDENTIFIER)) {
250+
int precision =
251+
Preconditions.checkNotNull(
252+
logicalType.getArgument(),
253+
"Expected logical type argument for timestamp precision.");
254+
if (precision != 9) {
255+
throw new RuntimeException(
256+
"Unsupported precision for Timestamp logical type " + precision);
257+
}
258+
// Map Timestamp.NANOS logical type to BigQuery TIMESTAMP(12) for nanosecond precision
259+
type = TableFieldSchema.Type.TIMESTAMP;
260+
builder.setTimestampPrecision(Int64Value.newBuilder().setValue(12L).build());
261+
} else {
262+
type = LOGICAL_TYPES.get(logicalType.getIdentifier());
263+
if (type == null) {
264+
throw new RuntimeException("Unsupported logical type " + field.getType());
265+
}
249266
}
250267
builder = builder.setType(type);
251268
break;
@@ -341,17 +358,39 @@ private static Object toProtoValue(
341358
fieldDescriptor.getMessageType(), keyType, valueType, entry))
342359
.collect(Collectors.toList());
343360
default:
344-
return scalarToProtoValue(beamFieldType, value);
361+
return scalarToProtoValue(fieldDescriptor, beamFieldType, value);
345362
}
346363
}
347364

365+
private static DynamicMessage buildTimestampPicosMessage(
366+
Descriptor timestampPicosDescriptor, Instant instant) {
367+
long seconds = instant.getEpochSecond();
368+
long picoseconds = instant.getNano() * 1000L; // nanos → picos
369+
370+
return DynamicMessage.newBuilder(timestampPicosDescriptor)
371+
.setField(
372+
Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("seconds")),
373+
seconds)
374+
.setField(
375+
Preconditions.checkNotNull(timestampPicosDescriptor.findFieldByName("picoseconds")),
376+
picoseconds)
377+
.build();
378+
}
379+
348380
@VisibleForTesting
349-
static Object scalarToProtoValue(FieldType beamFieldType, Object value) {
381+
static Object scalarToProtoValue(
382+
@Nullable FieldDescriptor fieldDescriptor, FieldType beamFieldType, Object value) {
350383
if (beamFieldType.getTypeName() == TypeName.LOGICAL_TYPE) {
351384
@Nullable LogicalType<?, ?> logicalType = beamFieldType.getLogicalType();
352385
if (logicalType == null) {
353386
throw new RuntimeException("Unexpectedly null logical type " + beamFieldType);
354387
}
388+
if (logicalType.getIdentifier().equals(Timestamp.IDENTIFIER)) {
389+
Instant instant = (Instant) value;
390+
Descriptor timestampPicosDescriptor =
391+
Preconditions.checkNotNull(fieldDescriptor).getMessageType();
392+
return buildTimestampPicosMessage(timestampPicosDescriptor, instant);
393+
}
355394
@Nullable
356395
BiFunction<LogicalType<?, ?>, Object, Object> logicalTypeEncoder =
357396
LOGICAL_TYPE_ENCODERS.get(logicalType.getIdentifier());

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv
644644
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
645645
if (useAvroLogicalTypes
646646
&& (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) {
647-
return fieldSchema.setType("TIMESTAMP");
647+
return fieldSchema.setType("TIMESTAMP").setTimestampPrecision(12L);
648648
}
649649
if (logicalType instanceof LogicalTypes.TimeMicros) {
650650
return fieldSchema.setType("TIME");

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,17 @@ private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
587587
field.setFields(toTableFieldSchema(mapSchema));
588588
field.setMode(Mode.REPEATED.toString());
589589
}
590-
field.setType(toStandardSQLTypeName(type).toString());
590+
Schema.LogicalType<?, ?> logicalType = type.getLogicalType();
591+
if (logicalType != null && Timestamp.IDENTIFIER.equals(logicalType.getIdentifier())) {
592+
int precision = Preconditions.checkArgumentNotNull(logicalType.getArgument());
593+
if (precision != 9) {
594+
throw new IllegalArgumentException(
595+
"Unsupported precision for Timestamp logical type " + precision);
596+
}
597+
field.setType(StandardSQLTypeName.TIMESTAMP.toString()).setTimestampPrecision(12L);
598+
} else {
599+
field.setType(toStandardSQLTypeName(type).toString());
600+
}
591601

592602
fields.add(field);
593603
}

0 commit comments

Comments
 (0)