-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Allow struct field access projections to be pushed down into scans #19538
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
85890ee to
aab1d3a
Compare
| // Check whether `expr` is trivial; i.e. it doesn't imply any computation. | ||
| fn is_expr_trivial(expr: &Expr) -> bool { | ||
| matches!(expr, Expr::Column(_) | Expr::Literal(_, _)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As evidenced by the existing functions for both logical and physical expressions this was already a concept and implementation within the codebase, so all this PR is really doing is allowing arbitrary functions / expressions to declare themselves as trivial.
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- I think this PR is really nice
I don't think we should push push constants all the way down in the scan -- doing so will require passing (and filtering) Arrays of constant values through the plan. This will be expensive
Or put another way I don't think Literal is a trival physical as we have defined it. Maybe we can add special literal handling for the ordering optimziations
The only thing I think this PR needs is some more slt / explain tests that show it working (I added comments below). I think it would be ok to ad these tests as a follow on PR too
I think your formalization of the is_trivial API is a beautiful way to encapsulate the existing concept that was in the code
| None | ||
| } | ||
|
|
||
| /// Returns true if this function is trivial (cheap to evaluate). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest that a good rule of thumb here is that the function takes constant time per RecordBatch (aka it doesn't depend on the number of rows in the batch). Struct field access and column have this property but other functions don't
| self.doc() | ||
| } | ||
|
|
||
| fn is_trivial(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend some comments explaining the rationale -- namely to allow these accesses to be pushed down into scans
| /// - Nested combinations of field accessors (e.g., `col['a']['b']`) | ||
| /// | ||
| /// This is used to identify expressions that are cheap to duplicate or | ||
| /// don't benefit from caching/partitioning optimizations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe also include that they will be pushed below filters so if they do per-row work, setting is_trivial may slow things down
| // If expressions are all trivial (columns, literals, or field accessors), | ||
| // then all computations in this projection are reorder or rename, | ||
| // and projection would not benefit from the repartition. | ||
| vec![!self.projection_expr().is_trivial()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a very nice simplification and a good illustration of the power of the is_trivial API
| DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] | ||
| FilterExec: id@0 = level@1 | ||
| RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1 | ||
| DataSourceExec: file_groups={1 group: [[TMP_DIR/hierarchy.parquet]]}, projection=[id, 1 as level], file_type=parquet, predicate=id@0 = 1, pruning_predicate=id_null_count@2 != row_count@3 AND id_min@0 <= 1 AND 1 <= id_max@1, required_guarantees=[id in (1)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is somewhat interesting that it materializes the constant in the scan. This is probably ok, but it does mean that constant may now get carried as a constant record batch up through the plan many 🤔
| 04)------ProjectionExec: expr=[get_field(__unnest_placeholder(recursive_unnest_table.column3,depth=1)@0, c1) as __unnest_placeholder(UNNEST(recursive_unnest_table.column3)[c1]), column3@1 as column3] | ||
| 05)--------UnnestExec | ||
| 06)----------ProjectionExec: expr=[column3@0 as __unnest_placeholder(recursive_unnest_table.column3), column3@0 as column3] | ||
| 07)------------DataSourceExec: partitions=1, partition_sizes=[1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should add some more specific testing for pushdown into parquet scans
Perhaps in parquet_pushdown.slt
Here are some suggestions:
setup
> copy (select column1 as a, column2 as b, column3 as s from values (1, 2, {foo: 'bar'}), (10, 20, {foo: 'baz'} )) to 'foo.parquet';
+-------+
| count |
+-------+
| 2 |
+-------+
1 row(s) fetched.
Elapsed 0.019 seconds.
> select * from 'foo.parquet';
+----+----+------------+
| a | b | s |
+----+----+------------+
| 1 | 2 | {foo: bar} |
| 10 | 20 | {foo: baz} |
+----+----+------------+
2 row(s) fetched.
Elapsed 0.014 seconds.Then demonstrate constant pushdown
> explain format indent select a, 1 from 'foo.parquet';
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: foo.parquet.a, Int64(1) |
| | TableScan: foo.parquet projection=[a] |
| physical_plan | DataSourceExec: file_groups={1 group: [[Users/andrewlamb/Software/datafusion/foo.parquet]]}, projection=[a, 1 as Int64(1)], file_type=parquet |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.008 seconds.Also show get_field pushdown
> explain format indent select a, s['foo'] from 'foo.parquet';
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: foo.parquet.a, get_field(foo.parquet.s, Utf8("foo")) |
| | TableScan: foo.parquet projection=[a, s] |
| physical_plan | DataSourceExec: file_groups={1 group: [[Users/andrewlamb/Software/datafusion/foo.parquet]]}, projection=[a, get_field(s@2, foo) as foo.parquet.s[foo]], file_type=parquet |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.013 seconds.6bb96c0 to
04a8b9e
Compare
Add `should_push_through_operator` method to `ProjectionExprs` that checks whether a projection provides actual benefit when pushed through operators like Filter, Sort, Repartition, etc. A projection should be pushed through when it is: 1. Trivial (no expensive computations to duplicate) 2. AND provides benefit via one of: - Narrowing the schema (fewer output columns than input) - Having field accessors that reduce data size - Having literals that can be absorbed by the datasource Column-only projections that just rename without narrowing the schema are NOT pushed through, as they provide no benefit. This fixes test failures where column-only renaming projections were incorrectly being pushed through filters and other operators. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
74a35d9 to
3b269cf
Compare
| 01)ProjectionExec: expr=[foo@0 as foo] | ||
| 02)--SortPreservingMergeExec: [part_key@1 ASC NULLS LAST], fetch=1 | ||
| 03)----SortExec: TopK(fetch=1), expr=[part_key@1 ASC NULLS LAST], preserve_partitioning=[true] | ||
| 04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/test_limit_with_partitions/part-2.parquet]]}, projection=[1 as foo, part_key], file_type=parquet, predicate=DynamicFilter [ empty ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb one issue I've run into is that it's hard to encode the logic of "don't push down literals if they're on their own or with just columns, but do push them down if they're arguments to a scalar function that will not create a RecordBatch from them" hard. I.e. select 1 as foo, get_field(struct, 'another literal') ....
I think the right thing to do here is to let get_field handle the "recursion". I.e. instead of our current logic of:
// In ScalarFunctionExpr
fn is_trivial(&self) -> bool {
if !self.fun.is_trivial() {
return false;
}
self.args.iter().all(|arg| arg.is_trivial())
}Into:
fn is_trivial(&self) -> bool {
if !self.fun.is_trivial(&self.args) {
return false;
}
}Or something like that. Realistically only the function knows if what it's going to do with the arguments is efficient or not.
But there's two issues with this:
- We need to methods on
ScalarFunctionUDFImpl, one for logical layer and one for physical (is_trivial_logical(args: &[Expr])andis_trivial_physical(args: &[Arc<dyn PhysicalExpr>])or something like that. ScalarFunctionUDFImplcan't even referencePhysicalExprbecause of crate dependency cycles 😢
Any thoughts?
Closes #19387
The idea is to elevate the existing concept of a "trivial" expression from hardcoded in various places around the codebase to dynamically definable by expressions / functions. By default columns, literals and field accessor functions are considered trivial, but users can define their own (e.g. for variant).
This solves #19387 because then field accessors are treated the same as columns and thus get pushed down through projections, with the caveat of #19550