[GLUTEN-11088][CORE] Fix Spark 4.0 GlutenDynamicPartitionPruningV1SuiteAEOn#11212
[GLUTEN-11088][CORE] Fix Spark 4.0 GlutenDynamicPartitionPruningV1SuiteAEOn#11212jinchengchenghh merged 1 commit intoapache:mainfrom
Conversation
|
Run Gluten ClickHouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
|
||
| def subtractFilters(left: Seq[Expression], right: Seq[Expression]): Seq[Expression] = { | ||
| (left.toSet -- right.toSet).toSeq | ||
| val scanSet = left.map(_.canonicalized).toSet |
There was a problem hiding this comment.
Hi, I find it causes ReuseExchange issue, I find you add combineFilters, do you have any suggestions for that, I think it may meet the same issue @Zouxxyy
There was a problem hiding this comment.
Oh, I checked the commit history, and this PR (#6296) changed ExpressionSet to a regular Set. The original ExpressionSet compared expressions based on both determinism and canonical representation, whereas a regular Set only compares expressions directly (using their default equals/hashCode).
It seems what we actually need is comparison based on canonicalized forms. Perhaps we could modify the code like this:
def subtractFilters(left: Seq[Expression], right: Seq[Expression]): Seq[Expression] = {
val rightCanonicalSet = right.map(_.canonicalized).toSet
left.filterNot(expr => rightCanonicalSet.contains(expr.canonicalized))
}
def combineFilters(left: Seq[Expression], right: Seq[Expression]): Seq[Expression] = {
val seen = scala.collection.mutable.Set[Expression]()
val result = scala.collection.mutable.ListBuffer[Expression]()
def tryAdd(expr: Expression): Unit = {
val canon = expr.canonicalized
if (!seen.contains(canon)) {
seen += canon
result += expr
}
}
(left ++ right).foreach(tryAdd)
result.toList
}
There was a problem hiding this comment.
Maybe here we cannot use canonicalized expression, otherwise, will throw subquery is not finished exception. with following code
val scanSet = left.map(_.canonicalized).toSet
scanSet.toSeq ++ right.filter(f => !scanSet.contains(f.canonicalized))
Keep current code now
|
Run Gluten Clickhouse CI on x86 |
5 similar comments
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
2 similar comments
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
| // | ||
| // See also org.apache.gluten.execution.FilterHandler#applyFilterPushdownToScan | ||
| // See also DynamicPartitionPruningSuite.scala:1362 | ||
| assert(subqueryIds.size == 3, "Whole plan subquery reusing not working correctly") |
There was a problem hiding this comment.
Maybe the previous version has extra subquery, the filter should not affect ReusedSubquery, Gluten with Spark4.0 result is same with jvm Spark
|
Run Gluten Clickhouse CI on x86 |
Support FileSourceScanExecTransformer stream.
FileSourceScanExecTransformer is not equal because dataFilters is not equal,
(cast(x#688 as double) = ReusedSubquery Subquery subquery#683, [id=#2324])and(cast(x#688 as double) = Subquery subquery#683, [id=#2324])should be same, must use canonicalized expression to deduplicate.Fix dynamic pruning FileSourceScanExec getPartitions because add new class ScanFileListing and other refactor for inputRDD
Related issue: #11088