Skip to content

[GLUTEN-11088][CORE] Fix Spark 4.0 GlutenDynamicPartitionPruningV1SuiteAEOn#11212

Merged
jinchengchenghh merged 1 commit intoapache:mainfrom
jinchengchenghh:dpp
Dec 1, 2025
Merged

[GLUTEN-11088][CORE] Fix Spark 4.0 GlutenDynamicPartitionPruningV1SuiteAEOn#11212
jinchengchenghh merged 1 commit intoapache:mainfrom
jinchengchenghh:dpp

Conversation

@jinchengchenghh
Copy link
Copy Markdown
Contributor

@jinchengchenghh jinchengchenghh commented Nov 27, 2025

Support FileSourceScanExecTransformer stream.

FileSourceScanExecTransformer is not equal because dataFilters is not equal, (cast(x#688 as double) = ReusedSubquery Subquery subquery#683, [id=#2324]) and (cast(x#688 as double) = Subquery subquery#683, [id=#2324]) should be same, must use canonicalized expression to deduplicate.

Fix dynamic pruning FileSourceScanExec getPartitions because add new class ScanFileListing and other refactor for inputRDD

Related issue: #11088

@github-actions github-actions bot added the CORE works for Gluten Core label Nov 27, 2025
@jinchengchenghh
Copy link
Copy Markdown
Contributor Author

Run Gluten ClickHouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

1 similar comment
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86


def subtractFilters(left: Seq[Expression], right: Seq[Expression]): Seq[Expression] = {
(left.toSet -- right.toSet).toSeq
val scanSet = left.map(_.canonicalized).toSet
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I find it causes ReuseExchange issue, I find you add combineFilters, do you have any suggestions for that, I think it may meet the same issue @Zouxxyy

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I checked the commit history, and this PR (#6296) changed ExpressionSet to a regular Set. The original ExpressionSet compared expressions based on both determinism and canonical representation, whereas a regular Set only compares expressions directly (using their default equals/hashCode).

It seems what we actually need is comparison based on canonicalized forms. Perhaps we could modify the code like this:

def subtractFilters(left: Seq[Expression], right: Seq[Expression]): Seq[Expression] = {
  val rightCanonicalSet = right.map(_.canonicalized).toSet
  left.filterNot(expr => rightCanonicalSet.contains(expr.canonicalized))
}

def combineFilters(left: Seq[Expression], right: Seq[Expression]): Seq[Expression] = {
  val seen = scala.collection.mutable.Set[Expression]()
  val result = scala.collection.mutable.ListBuffer[Expression]()

  def tryAdd(expr: Expression): Unit = {
    val canon = expr.canonicalized
    if (!seen.contains(canon)) {
      seen += canon
      result += expr
    }
  }

  (left ++ right).foreach(tryAdd)
  result.toList
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe here we cannot use canonicalized expression, otherwise, will throw subquery is not finished exception. with following code

val scanSet = left.map(_.canonicalized).toSet
scanSet.toSeq ++ right.filter(f => !scanSet.contains(f.canonicalized))

Keep current code now

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

5 similar comments
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

2 similar comments
@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

@jinchengchenghh jinchengchenghh marked this pull request as ready for review November 29, 2025 23:21
//
// See also org.apache.gluten.execution.FilterHandler#applyFilterPushdownToScan
// See also DynamicPartitionPruningSuite.scala:1362
assert(subqueryIds.size == 3, "Whole plan subquery reusing not working correctly")
Copy link
Copy Markdown
Contributor Author

@jinchengchenghh jinchengchenghh Nov 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the previous version has extra subquery, the filter should not affect ReusedSubquery, Gluten with Spark4.0 result is same with jvm Spark

@github-actions
Copy link
Copy Markdown

Run Gluten Clickhouse CI on x86

Copy link
Copy Markdown
Member

@zhouyuan zhouyuan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@jinchengchenghh jinchengchenghh merged commit ba25aec into apache:main Dec 1, 2025
60 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CORE works for Gluten Core DATA_LAKE

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants