Skip to content

CometColumnarExchange throws exception when reading Delta table #1844

@Kontinuation

Description

@Kontinuation

Describe the bug

Reading Delta table with Comet enabled fails with the following error:

25/05/30 09:53:47 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
org.apache.comet.CometNativeException: StructBuilder (Schema { fields: [Field { name: "provider", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "options", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }) and field_builder with index 0 (Utf8) are of unequal lengths: (1 != 0).
        at std::backtrace::Backtrace::create(__internal__:0)
        at comet::errors::init::{{closure}}(__internal__:0)
        at std::panicking::rust_panic_with_hook(__internal__:0)
        at std::panicking::begin_panic_handler::{{closure}}(__internal__:0)
        at std::sys::backtrace::__rust_end_short_backtrace(__internal__:0)
        at _rust_begin_unwind(__internal__:0)
        at core::panicking::panic_fmt(__internal__:0)
        at arrow_array::builder::struct_builder::StructBuilder::validate_content::{{closure}}::panic_cold_display(__internal__:0)
        at arrow_array::builder::struct_builder::StructBuilder::validate_content(__internal__:0)
        at arrow_array::builder::struct_builder::StructBuilder::finish(__internal__:0)
        at <arrow_array::builder::struct_builder::StructBuilder as arrow_array::builder::ArrayBuilder>::finish(__internal__:0)
        at arrow_array::builder::struct_builder::StructBuilder::finish(__internal__:0)
        at <arrow_array::builder::struct_builder::StructBuilder as arrow_array::builder::ArrayBuilder>::finish(__internal__:0)
        at <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next(__internal__:0)
        at comet::execution::shuffle::row::process_sorted_row_partition(__internal__:0)
        at comet::execution::jni_api::Java_org_apache_comet_Native_writeSortedFileNative::{{closure}}::{{closure}}(__internal__:0)
        at _Java_org_apache_comet_Native_writeSortedFileNative(__internal__:0)
	at org.apache.comet.Native.writeSortedFileNative(Native Method)
	at org.apache.spark.sql.comet.execution.shuffle.SpillWriter.doSpilling(SpillWriter.java:187)
	at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter$ArrowIPCWriter.doSpilling(CometDiskBlockWriter.java:401)
	at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.close(CometDiskBlockWriter.java:304)
	at org.apache.spark.sql.comet.execution.shuffle.CometBypassMergeSortShuffleWriter.write(CometBypassMergeSortShuffleWriter.java:244)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Steps to reproduce

The following PySpark code reproduces this issue locally.

from pyspark.sql import SparkSession

COMET_JAR = "path/to/comet/jar"

spark = (SparkSession.builder
    .master("local[*]")
    .config("spark.jars", COMET_JAR)
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.1")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension,org.apache.comet.CometSparkSessionExtensions")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.driver.extraClassPath", COMET_JAR)
    .config("spark.executor.extraClassPath", COMET_JAR)
    .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
    .config("spark.comet.enabled", "true")
    .config("spark.comet.exec.shuffle.enabled", "true")
    .config("spark.comet.exec.shuffle.mode", "auto")
    .config("spark.comet.exec.shuffle.fallbackToColumnar", "true")
    .getOrCreate()
)

spark.range(0, 1000).write.format("delta").save("delta_table")
spark.read.format("delta").load("delta_table").show() # this line throws the aforementioned exception.

Expected behavior

Delta table should be read successfully.

Additional context

Spark version: Spark 3.5.4
Comet version: commit 7cf2e9d

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions