Skip to content

Conversation

@dqkqd
Copy link
Contributor

@dqkqd dqkqd commented Sep 30, 2025

Which issue does this PR close?

Rationale for this change

The optimizer fails on this query:

WITH suppliers AS (
  SELECT *
  FROM (VALUES (1, 10.0), (1, 20.0)) AS t(nation, acctbal)
)
SELECT
  ROW_NUMBER() OVER (PARTITION BY nation ORDER BY acctbal DESC) AS rn
FROM suppliers AS s
WHERE acctbal > (
  SELECT AVG(acctbal) FROM suppliers
);

#17770 (comment):
scalar_subquery_to_join rewrites the input schema fed to the window function, creating an invalid plan.

What changes are included in this PR?

In ScalarSubqueryToJoin, add a projection on top of the filter after converting the subquery to a join, similar to EliminateGroupByConstant:

/// Optimizer rule that removes constant expressions from `GROUP BY` clause
/// and places additional projection on top of aggregation, to preserve
/// original schema
#[derive(Default, Debug)]
pub struct EliminateGroupByConstant {}

Are these changes tested?

Yes, using the existing test suite.

I verified using the query noted in the issue:

DataFusion CLI v50.1.0
> WITH suppliers AS (
  SELECT *
  FROM (VALUES (1, 10.0), (1, 20.0)) AS t(nation, acctbal)
)
SELECT
  ROW_NUMBER() OVER (PARTITION BY nation ORDER BY acctbal DESC) AS rn
FROM suppliers AS s
WHERE acctbal > (
  SELECT AVG(acctbal) FROM suppliers
);
+----+
| rn |
+----+
| 1  |
+----+
1 row(s) fetched.
Elapsed 0.031 seconds.

>

Are there any user-facing changes?

No.

@github-actions github-actions bot added the optimizer Optimizer rules label Sep 30, 2025
@dqkqd dqkqd changed the title fix: use Window::try_new because input schema has changed fix: 'common_sub_expression_eliminate' fails in a window function Oct 1, 2025
@dqkqd
Copy link
Contributor Author

dqkqd commented Oct 1, 2025

From #17770 (comment)
The scalar_subquery_to_join changes window aggregate function input schema, causing the optimizer to fail.

My initial thought was changing Window::try_new_with_schema to Window::try_new and let it recompute the output schema.
However, this branch runs when nothing should be optimized ( FoundCommonNodes::No), using try_new here might be costly.

Window::try_new_with_schema(
new_window_expr,
Arc::new(plan),
schema,
)

This change worked:

> WITH suppliers AS (
  SELECT *
  FROM (VALUES (1, 10.0), (1, 20.0)) AS t(nation, acctbal)
)
SELECT
  ROW_NUMBER() OVER (PARTITION BY nation ORDER BY acctbal DESC) AS rn
FROM suppliers AS s
WHERE acctbal > (
  SELECT AVG(acctbal) FROM suppliers
);
+----+
| rn |
+----+
| 1  |
+----+
1 row(s) fetched.
Elapsed 0.029 seconds.

The generated plan:

    Projection: row_number() PARTITION BY [s.nation] ORDER BY [s.acctbal DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS rn [rn:UInt64]
      WindowAggr: windowExpr=[[row_number() PARTITION BY [s.nation] ORDER BY [s.acctbal DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [nation:Int64;N, acctbal:Float64;N, row_number() PARTITION BY [s.nation] ORDER BY [s.acctbal DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]
        Projection: s.nation, s.acctbal [nation:Int64;N, acctbal:Float64;N]
          Inner Join:  Filter: s.acctbal > __scalar_sq_1.avg(suppliers.acctbal) [nation:Int64;N, acctbal:Float64;N, avg(suppliers.acctbal):Float64;N]
            SubqueryAlias: s [nation:Int64;N, acctbal:Float64;N]
              SubqueryAlias: suppliers [nation:Int64;N, acctbal:Float64;N]
                SubqueryAlias: t [nation:Int64;N, acctbal:Float64;N]
                  Projection: column1 AS nation, column2 AS acctbal [nation:Int64;N, acctbal:Float64;N]
                    Values: (Int64(1), Float64(10)), (Int64(1), Float64(20)) [column1:Int64;N, column2:Float64;N]
            SubqueryAlias: __scalar_sq_1 [avg(suppliers.acctbal):Float64;N]
              Aggregate: groupBy=[[]], aggr=[[avg(suppliers.acctbal)]] [avg(suppliers.acctbal):Float64;N]
                SubqueryAlias: suppliers [acctbal:Float64;N]
                  SubqueryAlias: t [acctbal:Float64;N]
                    Projection: column2 AS acctbal [acctbal:Float64;N]
                      Values: (Int64(1), Float64(10)), (Int64(1), Float64(20)) [column1:Int64;N, column2:Float64;N]

@dqkqd
Copy link
Contributor Author

dqkqd commented Oct 1, 2025

I tried to return early here if there is no common node:

FoundCommonNodes::No {
original_nodes_list: original_exprs_list,
} => Ok(Transformed::no((original_exprs_list, input, None))),

However, the optimizer failed in 'optimize_projections':

DataFusion CLI v50.0.0
> WITH suppliers AS (
  SELECT *
  FROM (VALUES (1, 10.0), (1, 20.0)) AS t(nation, acctbal)
)
SELECT
  ROW_NUMBER() OVER (PARTITION BY nation ORDER BY acctbal DESC) AS rn
FROM suppliers AS s
WHERE acctbal > (
  SELECT AVG(acctbal) FROM suppliers
);
Optimizer rule 'optimize_projections' failed
caused by
Schema error: No field named "row_number() PARTITION BY [s.nation] ORDER BY [s.acctbal DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW". Valid fields are s.acctbal, __scalar_sq_1."avg(suppliers.acctbal)".
>

If I disable the entire common_sub_expression_eliminate rule, the optimizer fails with the same error.

diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 084152d40..ca4a85692 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -252,7 +252,7 @@ impl Optimizer {
             // The previous optimizations added expressions and projections,
             // that might benefit from the following rules
             Arc::new(EliminateGroupByConstant::new()),
-            Arc::new(CommonSubexprEliminate::new()),
+            // Arc::new(CommonSubexprEliminate::new()),
             Arc::new(OptimizeProjections::new()),
         ];

In fact, the same issue happens if I disable everything in between ScalarSubqueryToJoin and OptimizeProjections. I believe there might be a bug in ScalarSubqueryToJoin.

diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs
index 084152d40..684b01b88 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -234,25 +234,25 @@ impl Optimizer {
             Arc::new(EliminateJoin::new()),
             Arc::new(DecorrelatePredicateSubquery::new()),
             Arc::new(ScalarSubqueryToJoin::new()),
-            Arc::new(DecorrelateLateralJoin::new()),
-            Arc::new(ExtractEquijoinPredicate::new()),
-            Arc::new(EliminateDuplicatedExpr::new()),
-            Arc::new(EliminateFilter::new()),
-            Arc::new(EliminateCrossJoin::new()),
-            Arc::new(EliminateLimit::new()),
-            Arc::new(PropagateEmptyRelation::new()),
-            // Must be after PropagateEmptyRelation
-            Arc::new(EliminateOneUnion::new()),
-            Arc::new(FilterNullJoinKeys::default()),
-            Arc::new(EliminateOuterJoin::new()),
-            // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
-            Arc::new(PushDownLimit::new()),
-            Arc::new(PushDownFilter::new()),
-            Arc::new(SingleDistinctToGroupBy::new()),
-            // The previous optimizations added expressions and projections,
-            // that might benefit from the following rules
-            Arc::new(EliminateGroupByConstant::new()),
-            Arc::new(CommonSubexprEliminate::new()),
+            // Arc::new(DecorrelateLateralJoin::new()),
+            // Arc::new(ExtractEquijoinPredicate::new()),
+            // Arc::new(EliminateDuplicatedExpr::new()),
+            // Arc::new(EliminateFilter::new()),
+            // Arc::new(EliminateCrossJoin::new()),
+            // Arc::new(EliminateLimit::new()),
+            // Arc::new(PropagateEmptyRelation::new()),
+            // // Must be after PropagateEmptyRelation
+            // Arc::new(EliminateOneUnion::new()),
+            // Arc::new(FilterNullJoinKeys::default()),
+            // Arc::new(EliminateOuterJoin::new()),
+            // // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
+            // Arc::new(PushDownLimit::new()),
+            // Arc::new(PushDownFilter::new()),
+            // Arc::new(SingleDistinctToGroupBy::new()),
+            // // The previous optimizations added expressions and projections,
+            // // that might benefit from the following rules
+            // Arc::new(EliminateGroupByConstant::new()),
+            // Arc::new(CommonSubexprEliminate::new()),
             Arc::new(OptimizeProjections::new()),
         ];

@dqkqd dqkqd force-pushed the common-sub-expression-fails-in-window-function branch from c15a4f6 to 172de54 Compare October 3, 2025 13:31
@dqkqd dqkqd force-pushed the common-sub-expression-fails-in-window-function branch from 172de54 to 86efa98 Compare October 3, 2025 13:34
@dqkqd dqkqd changed the title fix: 'common_sub_expression_eliminate' fails in a window function fix: optimizer common_sub_expression_eliminate fails in a window function Oct 3, 2025
@dqkqd dqkqd marked this pull request as ready for review October 3, 2025 14:02
@alamb alamb mentioned this pull request Oct 3, 2025
37 tasks
@alamb
Copy link
Contributor

alamb commented Oct 3, 2025

Thank you for this @dqkqd

@Jefffrey do you happen to have any time to review this PR?

Copy link
Contributor

@Jefffrey Jefffrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me, thanks for the detailed breakdown in the issue & this PR to help me understand the root cause and fix 👍

I think we should add the original query that highlighted this bug to the test suite, perhaps in the window SLT. Also would be nice if could place a code comment right above that project to explain it (we do have that docstring comment on ScalarSubqueryToJoin struct but it's a bit far removed from the code fix so might be hard to link them together). Perhaps something like:

// Preserve original schema as new Join might have more fields than what Filter & parents expect

Somewhat unrelated thoughts

It does make me wonder why we have this invariant, on Window nodes that requires this schema matching; it feels like a bit of a limitation, but I guess that's what Window::try_new is supposed to fix but apparently isn't as performant. It would be nice if we could uplift the Window::try_new_with_schema function with some documentation explaining when to use it, etc. Especially as the original error message (Window has mismatch between number of expressions (1) and number of fields in schema (0)) is not very descriptive. I'll raise a separate issue for this.

Edit: raised #17912

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Oct 4, 2025
@dqkqd
Copy link
Contributor Author

dqkqd commented Oct 4, 2025

Thank @Jefffrey

I added the tests to window.slt, however, this test passed even without the change.
I verified with datafusion-cli and it failed.
I'm not sure what's happening.

@Jefffrey
Copy link
Contributor

Jefffrey commented Oct 4, 2025

Thank @Jefffrey

I added the tests to window.slt, however, this test passed even without the change. I verified with datafusion-cli and it failed. I'm not sure what's happening.

Oh that's weird, maybe something in the configs used for SLT 🤔

It would be nice to have an explicit test for this somewhere, I'll try look at SLT a bit to see why this might be happening

@Jefffrey
Copy link
Contributor

Jefffrey commented Oct 4, 2025

Oh I think it's this:

# We should remove the type checking in physical plan after we don't skip
# the failed optimizing rules by default.
# (see more in https://github.com/apache/datafusion/issues/4615)
statement ok
set datafusion.optimizer.skip_failed_rules = true
# Error is returned from the physical plan.
query error Cannot cast Utf8\("1 DAY"\) to Int8
SELECT
COUNT(c1) OVER (ORDER BY c2 RANGE BETWEEN '1 DAY' PRECEDING AND '2 DAY' FOLLOWING)
FROM aggregate_test_100;
statement ok
set datafusion.optimizer.skip_failed_rules = true

We never set it back to false so I guess the test at the bottom of the file was affected; a bit concerning we have these set configs across our SLTs but not much guarantee that the tests they were added for will clean them up.

Perhaps try putting this before the test:

statement ok
set datafusion.optimizer.skip_failed_rules = false

Edit: or better yet, fix that above snippet to set back to false for the second set (I assume that was the intention)

@dqkqd
Copy link
Contributor Author

dqkqd commented Oct 4, 2025

Thanks.

I disable the rule and the test failed without the change:

Completed 355 test files in 16 seconds                                                                                                                                                                              External error: 1 errors in file /home/dqk/workspace/datafusion/datafusion/sqllogictest/test_files/window.slt

1. query failed: DataFusion error: Optimizer rule 'common_sub_expression_eliminate' failed
caused by
Error during planning: Window has mismatch between number of expressions (1) and number of fields in schema (0)
[SQL] WITH suppliers AS (
  SELECT *
  FROM (VALUES (1, 10.0), (1, 20.0)) AS t(nation, acctbal)
)
SELECT
  ROW_NUMBER() OVER (PARTITION BY nation ORDER BY acctbal DESC) AS rn
FROM suppliers AS s
WHERE acctbal > (
  SELECT AVG(acctbal) FROM suppliers
);
at /home/dqk/workspace/datafusion/datafusion/sqllogictest/test_files/window.slt:6132

@Jefffrey
Copy link
Contributor

Jefffrey commented Oct 4, 2025

Raised #17914 btw just for tracking (don't know if this is happening in other SLT files, but I assume so)

@Jefffrey Jefffrey added this pull request to the merge queue Oct 6, 2025
Merged via the queue into apache:main with commit 82cd7f3 Oct 6, 2025
28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Optimizer 'common_sub_expression_eliminate' fails in a window function (SQLStorm)

3 participants