-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Support User Defined Window Functions #6703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
18a034c to
6f87009
Compare
6f87009 to
5fe955b
Compare
5fe955b to
1427b1c
Compare
|
Ok, I think this PR is basically ready for review (once #6690 is merged). |
| PartitionEvaluator, ReturnTypeFunction, Signature, Volatility, WindowUDF, | ||
| }; | ||
|
|
||
| /// A query with a window function evaluated over the entire partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are the new tests
|
|
||
| /// Basic stateful user defined window function | ||
| #[tokio::test] | ||
| async fn test_stateful_udwf() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't think of anything else to test with a stateful function other than update_state is called. I would be interested in anyone else's opinions in this matter
1427b1c to
fa0e40f
Compare
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| /// Run all tests that are found in the `user_defined` directory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the user defined tests now run as part of the same test binaries user_defined_integration
| @@ -0,0 +1,213 @@ | |||
| // Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a user facing example of how to use User Defined Window Functions
| }) | ||
| } | ||
|
|
||
| /// Creates a `BuiltInWindowFunctionExpr` suitable for a user defined window function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mirrors the code and structure for AggregateUDF
| values: &[ArrayRef], | ||
| range: &std::ops::Range<usize>, | ||
| ) -> Result<ScalarValue> { | ||
| //println!("evaluate_inside_range(). range: {range:#?}, values: {values:#?}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is leftover
| WindowUDF { | ||
| name: String::from("smooth_it"), | ||
| // it will take 1 arguments -- the column to smooth | ||
| signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem to be effecting result, but I don't understand why signature takes DataType::Int32?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it was the default type that the CSV parser picked, but apparently not:
❯ describe './datafusion/core/tests/data/cars.csv';
describe './datafusion/core/tests/data/cars.csv';
+-------------+-----------------------------+-------------+
| column_name | data_type | is_nullable |
+-------------+-----------------------------+-------------+
| car | Utf8 | YES |
| speed | Float64 | YES |
| time | Timestamp(Nanosecond, None) | YES |
+-------------+-----------------------------+-------------+
3 rows in set. Query took 0.031 seconds.I'lll update this to take a Float instead.
| } | ||
|
|
||
| /// Apply one or more window functions ([`Expr::WindowFunction`]) to extend the schema | ||
| pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can add an example usage to the docstring. such as in the pub fn aggregate. We can do so in following PRs also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call -- I tried to make one, and it turns out to be non trivial. I will do it in a follow on PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is what I was thinking: #6746 if you like it I will polish it up
datafusion/core/tests/user_defined/user_defined_window_functions.rs
Outdated
Show resolved
Hide resolved
| config.set(&k, &v)?; | ||
| } | ||
| let session_config = SessionConfig::from(config); | ||
| let window_functions = HashMap::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess, since this API is deprecated anyway, you didn't update its arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly. I also tried to lead people to SessionContext::task_ctx with another doc comment above.
mustafasrepo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alamb. I left some inline comments, related to comments, etc. Other than these, This PR is LGTM!. Thanks for this feature.
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
…on into alamb/udfw_for_real
Co-authored-by: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com>
…on into alamb/udfw_for_real
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the thorough review @mustafasrepo.
| WindowUDF { | ||
| name: String::from("smooth_it"), | ||
| // it will take 1 arguments -- the column to smooth | ||
| signature: Signature::exact(vec![DataType::Int32], Volatility::Immutable), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it was the default type that the CSV parser picked, but apparently not:
❯ describe './datafusion/core/tests/data/cars.csv';
describe './datafusion/core/tests/data/cars.csv';
+-------------+-----------------------------+-------------+
| column_name | data_type | is_nullable |
+-------------+-----------------------------+-------------+
| car | Utf8 | YES |
| speed | Float64 | YES |
| time | Timestamp(Nanosecond, None) | YES |
+-------------+-----------------------------+-------------+
3 rows in set. Query took 0.031 seconds.I'lll update this to take a Float instead.
| config.set(&k, &v)?; | ||
| } | ||
| let session_config = SessionConfig::from(config); | ||
| let window_functions = HashMap::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly. I also tried to lead people to SessionContext::task_ctx with another doc comment above.
| } | ||
|
|
||
| /// Apply one or more window functions ([`Expr::WindowFunction`]) to extend the schema | ||
| pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call -- I tried to make one, and it turns out to be non trivial. I will do it in a follow on PR
stuartcarnie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great stuff, @alamb – apologies, I didn't realise my comments were tied to a review, so they are late, but all just giving praise 👍🏻
| - [`rewrite_expr.rs`](examples/rewrite_expr.rs): Define and invoke a custom Query Optimizer pass | ||
| - [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User Defined Aggregate Function (UDAF) | ||
| - [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined (scalar) Function (UDF) | ||
| - [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User Defined Window Function (UDWF) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These examples are 💯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation in this example is excellent 👏🏻
| /// Apply one or more window functions ([`Expr::WindowFunction`]) to extend the schema | ||
| pub fn window(self, window_exprs: Vec<Expr>) -> Result<DataFrame> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just looking for a similar API today – great to know it is coming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stuartcarnie I wonder if you have any thoughts about the API here for making window functions: #6746 (it is somewhat complex at the moment)
Which issue does this PR close?
Closes #5781
Closes #6617
Rationale for this change
See #5781
This is the final PR that integrates all the work we have done this week with @mustafarepo and others to support User Defined Window Functions
It includes the parts of #6617 that have not been merged already (connecting the window functions to the various registries), and adds tests.
I tried to keep the size of this PR as small as possible -- 2/3 of the change is tests or examples -- but it is still large and I apologize to reviewers.
What changes are included in this PR?
WindowUDFand connect it through datafusion_expr and contextssimple_udwf.rsexampleAre these changes tested?
yes
Are there any user-facing changes?
yes, new feature