Skip to content

[Bug]: IcebergIO cannot write Timestamp columns  #32680

@DanielMorales9

Description

@DanielMorales9

What happened?

When Trying to write timestamp data into an Iceberg Table I get the following exception:

Caused by: java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.time.OffsetDateTime (java.lang.Long and java.time.OffsetDateTime are in module java.base of loader 'bootstrap')
        at org.apache.iceberg.data.parquet.BaseParquetWriter$TimestamptzWriter.write(BaseParquetWriter.java:281)
        at org.apache.iceberg.parquet.ParquetValueWriters$OptionWriter.write(ParquetValueWriters.java:356)
        at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
        at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
        at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
        at org.apache.beam.sdk.io.iceberg.RecordWriter.write(RecordWriter.java:105)
        at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:144)
        at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:234)
        at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:224)
        Suppressed: java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 1 data writer(s) still open

Source code:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Instant;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class HardcodedDataToIcebergPipeline {

    public static void main(String[] args) {
        // Define Beam pipeline options
        PipelineOptions options =
                PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);

        // Define Beam schema with a timestamp field
        Schema beamSchema = Schema.builder()
                .addStringField("id")
                .addDateTimeField("event_timestamp")
                .build();
        
        Map<String, Object> catalogConfig = new HashMap<String, Object>() {{
            put("warehouse", "<warehouse-dir>");
            put("uri", "<thrift-uri>");
            put("type", "iceberg");
            put("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
            put("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
        }};

        Map<String, Object> icebergConfig = new HashMap<String, Object>() {{
            put("catalog_name", appConfig.get("catalog"));
            put("catalog_properties", catalogConfig);
        }};
        String table = String.format("%s.%s", "myschema", "test");
        icebergConfig.put("table", table);

        // Create hardcoded data with timestamp
        List<Row> hardcodedData = Arrays.asList(
                Row.withSchema(beamSchema)
                        .addValues("record-1", Instant.now()) // Use Joda-Time Instant
                        .build(),
                Row.withSchema(beamSchema)
                        .addValues("record-2", Instant.now().minus(3600 * 1000)) // 1 hour ago
                        .build(),
                Row.withSchema(beamSchema)
                        .addValues("record-3", Instant.now().minus(24 * 3600 * 1000)) // 1 day ago
                        .build()
        );

        // Build the pipeline using the hardcoded data
        pipeline
                .apply("CreateHardcodedData", Create.of(hardcodedData).withRowSchema(beamSchema)) // Hardcoded 
                .apply("WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergConfig));

        // Run the pipeline
        pipeline.run().waitUntilFinish();
    }
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions