Skip to content

[C++][Dataset] Support reading date/time-partitioned datasets accounting for URL encoding (Spark) #28395

@asfimport

Description

@asfimport

I'm using Spark (3.1.1) to write a dataframe to a partitioned parquet dataset (using delta.io) which is partitioned by a timestamp field.

The relevant Spark code:

// code placeholder
(
  df.withColumn(
                "Date",
                sf.date_trunc(
                    "DAY",
                    sf.from_unixtime(
                        (sf.col("MyEpochField")),
                    ),
                ),
            )
    .write.format("delta")
    .mode("append")
    .partitionBy("Date")
    .save("...")

This gives a structure like following:

// code placeholder
/tip
/tip/Date=2021-05-04 00%3A00%3A00
/tip/Date=2021-05-04 00%3A00%3A00/Time=2021-05-04 07%3A27%3A00
/tip/Date=2021-05-04 00%3A00%3A00/Time=2021-05-04 07%3A27%3A00/part-00000-8846eb80-a369-43f6-a715-fec9cf1adf95.c000.snappy.parquet

Notice the : character is (url?) encoded because of fs protocol violation.

When i try to open this dataset using delta-rs (https://github.com/delta-io/delta-rs) which uses Arrow below the hood, then an error is raised trying to parse the Date (folder) value.

// code placeholder
pyarrow.lib.ArrowInvalid: error parsing '2021-05-03 00%3A00%3A00' as scalar of type timestamp[ns]

It seems this error is raised in ScalarParseImpl => ParseValue => StringConverter::Convert => ParseTimestampISO8601

The mentioned parse method does support for format:

// code placeholder
static inline bool ParseTimestampISO8601(const char* s, size_t length,
                                         TimeUnit::type unit,
                                         TimestampType::c_type* out) {
  using seconds_type = std::chrono::duration<TimestampType::c_type>;  // We allow the following formats for all units:
  // - "YYYY-MM-DD"
  // - "YYYY-MM-DD[ T]hhZ?"
  // - "YYYY-MM-DD[ T]hh:mmZ?"
  // - "YYYY-MM-DD[ T]hh:mm:ssZ?"
<...>

But may not support (url?) decoding the value upfront?

Questions we have:

  • Should Arrow support timestamp fields when used as partitioned field?

  • Where to decode?

     

    Some more information from the writing side.

    The writing is initiated using FileFormatWriter.write that eventually uses a DynamicPartitionDataWriter (passing in the partitionColumns through the job description).

    Here the actual "value" is rendered and concatennated.

    // code placeholder
      /** Expression that given partition columns builds a path string like: col1=val/col2=val/... */
      private lazy val partitionPathExpression: Expression = Concat(
        description.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
          val partitionName = ScalaUDF(
            ExternalCatalogUtils.getPartitionPathString _,
            StringType,
            Seq(Literal(c.name), Cast(c, StringType, Option(description.timeZoneId))))
          if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)
        })

    Where the encoding is done in:

    https://github.com/apache/spark/blob/v3.0.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala#L66

    If i understand correct, then Arrow should provide the equivalent of unescapePathName for fields used as partitioned columns.

     

Reporter: Paul Bormans
Assignee: David Li / @lidavidm

Related issues:

PRs and other links:

Note: This issue was originally created as ARROW-12644. Please see the migration documentation for further details.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions