-
Notifications
You must be signed in to change notification settings - Fork 270
Closed
Labels
bugSomething isn't workingSomething isn't working
Description
Describe the bug
Reading Delta table with Comet enabled fails with the following error:
25/05/30 09:53:47 ERROR Executor: Exception in task 3.0 in stage 0.0 (TID 3)
org.apache.comet.CometNativeException: StructBuilder (Schema { fields: [Field { name: "provider", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "options", data_type: Map(Field { name: "entries", data_type: Struct([Field { name: "key", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "value", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }) and field_builder with index 0 (Utf8) are of unequal lengths: (1 != 0).
at std::backtrace::Backtrace::create(__internal__:0)
at comet::errors::init::{{closure}}(__internal__:0)
at std::panicking::rust_panic_with_hook(__internal__:0)
at std::panicking::begin_panic_handler::{{closure}}(__internal__:0)
at std::sys::backtrace::__rust_end_short_backtrace(__internal__:0)
at _rust_begin_unwind(__internal__:0)
at core::panicking::panic_fmt(__internal__:0)
at arrow_array::builder::struct_builder::StructBuilder::validate_content::{{closure}}::panic_cold_display(__internal__:0)
at arrow_array::builder::struct_builder::StructBuilder::validate_content(__internal__:0)
at arrow_array::builder::struct_builder::StructBuilder::finish(__internal__:0)
at <arrow_array::builder::struct_builder::StructBuilder as arrow_array::builder::ArrayBuilder>::finish(__internal__:0)
at arrow_array::builder::struct_builder::StructBuilder::finish(__internal__:0)
at <arrow_array::builder::struct_builder::StructBuilder as arrow_array::builder::ArrayBuilder>::finish(__internal__:0)
at <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::next(__internal__:0)
at comet::execution::shuffle::row::process_sorted_row_partition(__internal__:0)
at comet::execution::jni_api::Java_org_apache_comet_Native_writeSortedFileNative::{{closure}}::{{closure}}(__internal__:0)
at _Java_org_apache_comet_Native_writeSortedFileNative(__internal__:0)
at org.apache.comet.Native.writeSortedFileNative(Native Method)
at org.apache.spark.sql.comet.execution.shuffle.SpillWriter.doSpilling(SpillWriter.java:187)
at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter$ArrowIPCWriter.doSpilling(CometDiskBlockWriter.java:401)
at org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.close(CometDiskBlockWriter.java:304)
at org.apache.spark.sql.comet.execution.shuffle.CometBypassMergeSortShuffleWriter.write(CometBypassMergeSortShuffleWriter.java:244)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Steps to reproduce
The following PySpark code reproduces this issue locally.
from pyspark.sql import SparkSession
COMET_JAR = "path/to/comet/jar"
spark = (SparkSession.builder
.master("local[*]")
.config("spark.jars", COMET_JAR)
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.1")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension,org.apache.comet.CometSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.driver.extraClassPath", COMET_JAR)
.config("spark.executor.extraClassPath", COMET_JAR)
.config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
.config("spark.comet.enabled", "true")
.config("spark.comet.exec.shuffle.enabled", "true")
.config("spark.comet.exec.shuffle.mode", "auto")
.config("spark.comet.exec.shuffle.fallbackToColumnar", "true")
.getOrCreate()
)
spark.range(0, 1000).write.format("delta").save("delta_table")
spark.read.format("delta").load("delta_table").show() # this line throws the aforementioned exception.Expected behavior
Delta table should be read successfully.
Additional context
Spark version: Spark 3.5.4
Comet version: commit 7cf2e9d
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working