-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: Add constant column extraction and rewriting for projections in ParquetOpener #19136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add constant column extraction and rewriting for projections in ParquetOpener #19136
Conversation
|
So quick! |
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will create some conflicts with #19128 but this is small and self contained so I see no reason to not proceed with it now and I can deal with the conflicts later... would help to get a review over there and associated PRs 🎣
| if !constant_columns.is_empty() { | ||
| predicate = predicate | ||
| .map(|expr| { | ||
| if is_dynamic_physical_expr(&expr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this clause? What breaks if we remove it? I'd think that rewriting the dynamic expression would work - it would try to rewrite it's children, which shouldn't cause any issues. Once snapshot is called the produces expression should have the remapped children.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to skip rewriting dynamic predicates. DynamicFilterPhysicalExpr keeps its own remapped children so updates can later rebind to each file’s schema; if we rewrite it with per-file constants here, we stash those constants in remapped_children and any later update turns into that constant (and leaks across files sharing the same dynamic filter), breaking dynamic pruning. Non-dynamic predicates are safe to rewrite because they don’t carry shared mutable state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense! I guess there's an opportunity to always apply another rewrite / simplify / replace lits pass after snapshotting the physical expression, but I don't know the reward / risk there (it could be expensive to do it after every record batch emitted, etc.). So will leave alone for now as you had originally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I restored this behavior in 7126f59 but did it at a more granular level. I'll see if I can fix this issue in DynamicFilterPhyscialExpr itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we can do one better (also that was the wrong place to put that logic): in d310161 I show that the children remapping is actually independent of the shared expression reference. Hence it is actually safe to remap children on a dynamic expression.
| projection.try_map_exprs(|expr| rewrite_physical_expr_with_constants(expr, constants)) | ||
| } | ||
|
|
||
| fn rewrite_physical_expr_with_constants( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very similar to https://github.com/apache/datafusion/pull/19128/files#diff-6bad7e4ee6dbc3a498e3fee746f2c3c18bdcf237d7cd12226e392f9b9c3d2fbe, we should be able to use it for partition values as well 😄
|
@Weijun-H let me know if you want to merge or chat about the feedback, I don't want to merge something that conflicts and then have this good work go stale |
This PR does some refactoring of `PhysicalExprAdapter` and `PhysicalExprSimplifier` that I found necessary and/or beneficial while working on #19111. ## Changes made ### Replace `PhysicalExprAdapter::with_partition_values` with `replace_columns_with_literals` This is a nice improvement because it: 1. Makes the `PhysicalExprAdapter` trait that users might need to implement simpler (less boilerplate for users). 2. Decouples these two transformations so that we can replace partition values and then apply a projection without having to also do the schema mapping (it would be from the logical schema to the logical schema, confusing and a waste of compute). I ran into this need in #19111. I think there may be other ways of doing it (e.g. piping in the expected output schema from ParquetSource) but it felt nicer this way and I expect other places may also need the decoupled transformation. 3. I think we can use it in the future to implement #19089 (edit: evidently I was right, see identical function in #19136). 4. It's less lines of code 😄 This will require any users calling `PhysicalExprAdapter` directly to change their code, I can add an entry to the upgrade guide. ### Remove partition pruning logic from `FilePruner` and deprecate now unused `PrunableStatistics` and `CompositePruningStatistics`. Since we replace partition values with literals we no longer need these structures, they get handled like any other literals. This is a good chunk of code / complexity that we can bin off. ### Use `TableSchema` instead of `SchemaRef` + `Vec<FieldRef>` in `ParquetOpener` `TableSchema` is basically `SchemaRef` + `Vec<FieldRef>` already and since `ParquetSource` has a `TableSchema` its less code and less clones to propagate that into `ParquetOpener` --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
5104e8e to
bfb108f
Compare
|
Maybe as a followup: I think there's an opportunity to refactor into a public function along the lines of: pub fn infer_static_columns_from_file(file: &PartitionedFile, schema: &Schema) -> HashMap<String, ScalarValue> { ... }And then use that in other |
|
Let's have this open for a day to allow any other feedback then I think it's ready to merge. |
… ParquetOpener Co-authored-by: Weijun-H <huangweijun1001@gmail.com>
c0b8c3f to
ebcf52d
Compare
Which issue does this PR close?
Rationale for this change
Use file/group statistics to detect constant (including all-NULL) columns so we can avoid reading/decoding useless data and simplify predicates for earlier pruning.
What changes are included in this PR?
Are these changes tested?
Yes
Are there any user-facing changes?
No