@@ -25,11 +25,12 @@ import org.apache.spark.SparkConf
2525import org .apache .spark .internal .Logging
2626import org .apache .spark .network .util .ByteUnit
2727import org .apache .spark .sql .{SparkSession , SparkSessionExtensions }
28- import org .apache .spark .sql .catalyst .expressions .{Divide , DoubleLiteral , EqualNullSafe , EqualTo , Expression , FloatLiteral , GreaterThan , GreaterThanOrEqual , KnownFloatingPointNormalized , LessThan , LessThanOrEqual , NamedExpression , PlanExpression , Remainder }
28+ import org .apache .spark .sql .catalyst .expressions .{Attribute , Divide , DoubleLiteral , EqualNullSafe , EqualTo , Expression , FloatLiteral , GreaterThan , GreaterThanOrEqual , KnownFloatingPointNormalized , LessThan , LessThanOrEqual , NamedExpression , PlanExpression , Remainder }
2929import org .apache .spark .sql .catalyst .expressions .aggregate .{Final , Partial }
3030import org .apache .spark .sql .catalyst .optimizer .NormalizeNaNAndZero
3131import org .apache .spark .sql .catalyst .rules .Rule
3232import org .apache .spark .sql .catalyst .trees .TreeNode
33+ import org .apache .spark .sql .catalyst .util .MetadataColumnHelper
3334import org .apache .spark .sql .comet ._
3435import org .apache .spark .sql .comet .execution .shuffle .{CometColumnarShuffle , CometNativeShuffle , CometShuffleExchangeExec , CometShuffleManager }
3536import org .apache .spark .sql .comet .util .Utils
@@ -101,7 +102,19 @@ class CometSparkSessionExtensions
101102 def isDynamicPruningFilter (e : Expression ): Boolean =
102103 e.exists(_.isInstanceOf [PlanExpression [_]])
103104
105+ def hasMetadataCol (plan : SparkPlan ): Boolean = {
106+ plan.expressions.exists(_.exists {
107+ case a : Attribute =>
108+ a.isMetadataCol
109+ case _ => false
110+ })
111+ }
112+
104113 plan.transform {
114+ case scan if hasMetadataCol(scan) =>
115+ withInfo(scan, " Metadata column is not supported" )
116+ scan
117+
105118 case scanExec : FileSourceScanExec
106119 if COMET_DPP_FALLBACK_ENABLED .get() &&
107120 scanExec.partitionFilters.exists(isDynamicPruningFilter) =>
0 commit comments