Skip to content

Commit 0028f1e

Browse files
authored
fix: Fallback to Spark if scan has meta columns (apache#997)
1 parent b131cc3 commit 0028f1e

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ import org.apache.spark.SparkConf
2525
import org.apache.spark.internal.Logging
2626
import org.apache.spark.network.util.ByteUnit
2727
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
28-
import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression, Remainder}
28+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression, Remainder}
2929
import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
3030
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
3131
import org.apache.spark.sql.catalyst.rules.Rule
3232
import org.apache.spark.sql.catalyst.trees.TreeNode
33+
import org.apache.spark.sql.catalyst.util.MetadataColumnHelper
3334
import org.apache.spark.sql.comet._
3435
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager}
3536
import org.apache.spark.sql.comet.util.Utils
@@ -101,7 +102,19 @@ class CometSparkSessionExtensions
101102
def isDynamicPruningFilter(e: Expression): Boolean =
102103
e.exists(_.isInstanceOf[PlanExpression[_]])
103104

105+
def hasMetadataCol(plan: SparkPlan): Boolean = {
106+
plan.expressions.exists(_.exists {
107+
case a: Attribute =>
108+
a.isMetadataCol
109+
case _ => false
110+
})
111+
}
112+
104113
plan.transform {
114+
case scan if hasMetadataCol(scan) =>
115+
withInfo(scan, "Metadata column is not supported")
116+
scan
117+
105118
case scanExec: FileSourceScanExec
106119
if COMET_DPP_FALLBACK_ENABLED.get() &&
107120
scanExec.partitionFilters.exists(isDynamicPruningFilter) =>

0 commit comments

Comments
 (0)