[SPARK-45112][SQL] Use UnresolvedFunction based resolution in SQL Dataset functions#42864
Conversation
1ce6422 to
b7fab4c
Compare
b7fab4c to
03bb5bc
Compare
8e3e600 to
fd88b32
Compare
9a49db8 to
e9fe889
Compare
e9fe889 to
3191591
Compare
| | abc| value| | ||
| +------------------+------+ | ||
| +---------------+------+ | ||
| |substr(l, 1, 3)|d[key]| |
There was a problem hiding this comment.
To avoid this change, I think we can call self.substring in this method. df.l[slice(1, 3)] does not require a certain function name, so keeping it as it was is better.
| // scalastyle:on line.size.limit | ||
| case class In(value: Expression, list: Seq[Expression]) extends Predicate { | ||
|
|
||
| def this(valueAndList: Seq[Expression]) = { |
There was a problem hiding this comment.
does it mean we support select in(a, b, c) now?
There was a problem hiding this comment.
Yes. But it was not a goal, it is just a useful side effect.
There was a problem hiding this comment.
I'm not sure if this is useful. in(a, b, c) looks pretty weird to me. Can we revert it and treat def in as a special case? The SQL parser also treat IN as a special case and has dedicated syntax for it.
| def count(e: Column): Column = withAggregateFunction { | ||
| e.expr match { | ||
| def count(e: Column): Column = { | ||
| val withoutStar = e.expr match { | ||
| // Turn count(*) into count(1) |
There was a problem hiding this comment.
Unrelated issue: this is not the right place to do this conversion. should be done in the analyzer. cc @zhengruifeng does spark connect hit the same issue?
There was a problem hiding this comment.
No, spark connect doesn't hit such issue.
There was a problem hiding this comment.
spark/python/pyspark/sql/connect/functions.py
Lines 1014 to 1015 in 89041a4
| def schema_of_json(json: Column, options: java.util.Map[String, String]): Column = { | ||
| withExpr(SchemaOfJson(json.expr, options.asScala.toMap)) | ||
| } | ||
| def schema_of_json(json: Column, options: java.util.Map[String, String]): Column = |
There was a problem hiding this comment.
I think these should be the few exceptions that we should still keep them as they were, because the expression (SchemaOfJson in this case) takes non-expression inputs, and it's tricky to define a protocol that can pass non-expression inputs as expressions.
There was a problem hiding this comment.
No sure I get your point. SchemaOfJson seems to accept options as expressions: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L776-L778. We just follow Connect here.
There was a problem hiding this comment.
Oh I see, then I'm fine with it. It's inevitable that we need to create map expression from the language native (scala or python) map values.
| case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] => | ||
| UnresolvedAlias(a, Some(Column.generateAlias)) | ||
| case u: UnresolvedFunction => UnresolvedAlias(expr, None) | ||
| case e if !e.resolved => UnresolvedAlias(expr, None) |
There was a problem hiding this comment.
Without this change some Python tests fail like sum_udf(col("v2")) + 5 here: https://github.com/apache/spark/blob/master/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py#L404C17-L404C40.
I debugged this today and it seems these are AggregateExpressions but the aggregate function is PythonUDAF so they don't match the previous case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] => case. Shall we remove the if condition? Column.generateAlias seems to take care of it: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L44-L50
There was a problem hiding this comment.
I changed the AggregateExpression path to case a: AggregateExpression => UnresolvedAlias(a, Some(Column.generateAlias)) but still kept the case _ if !expr.resolved => UnresolvedAlias(expr, None) because in the plus_one(sum_udf(col("v1"))) case an unresolved PythonUDF is passed here.
|
also cc @beliefer @panbingkun |
|
@zhengruifeng Thank you for you ping. |
The ultimate goal is to move dataset and connect functions into the |
…112-use-unresolvedfunction-in-dataset-functions # Conflicts: # sql/core/src/main/scala/org/apache/spark/sql/functions.scala
|
To add more color to @peter-toth 's comment. Think about a base trait to define these function APIs Then in the spark connect side, we override |
This reverts commit d611fd5.
dcae4e6 to
6bd03d1
Compare
| override def builder(e: Seq[Expression]): Expression = { | ||
| assert(e.length == 1, "Defined UDF only has one column") | ||
| val expr = e.head | ||
| assert(expr.resolved, "column should be resolved to use the same type " + |
There was a problem hiding this comment.
I'm a bit confused about this change. IIUC we always call builder with resolved expressions.
There was a problem hiding this comment.
| And( | ||
| LessThan(lit(1).expr, lit(5).expr), | ||
| LessThan(lit(6).expr, lit(7).expr)), | ||
| EqualTo(lit(0).expr, lit(-1).expr)) |
There was a problem hiding this comment.
shall we use dsl to build expressions?
|
thanks, merging to master! |
|
Thanks for the review! |
### What changes were proposed in this pull request? This PR proposes bottum-up resolution in `ResolveFunctions`, which is much faster (requires less number of resolution rounds) if we have deeply nested `UnresolvedFunctions`. These structures are more likely to occur after #42864. ### Why are the changes needed? Performance optimization. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43146 from peter-toth/SPARK-45354-resolve-functions-bottom-up. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com>
…EvalPythonExecSuite Following apache#11288: Fix error in case "arrow_udf test: with preprojection" due to Spark 4.0 change in apache/spark#42864.
…EvalPythonExecSuite Following apache#11288: Fix error in case "arrow_udf test: with preprojection" due to Spark 4.0 change in apache/spark#42864.
What changes were proposed in this pull request?
This is the first cleanup PR of the ticket to use
UnresolvedFunction/FunctionRegistrybased resolution in SQL Dataset functions similar to what Spark Connect does.Why are the changes needed?
If we can make the SQL and Connect Dataset functions similar then we can move the functions to
sql-apimodule.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing UTs.
Was this patch authored or co-authored using generative AI tooling?
No.