-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Support user defined ParquetAccessPlan in ParquetExec, validation to ParquetAccessPlan::select
#10813
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ation to ParquetAccessPlan::select
3e839ef to
38fa45a
Compare
ParquetAccessPlan to be passed in to ParquetExec, add validation to ParquetAccessPlan::selectParquetAccessPlan in ParquetExec, validation to ParquetAccessPlan::select
| /// is returned for *all* the rows in the row groups that are not skipped. | ||
| /// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`]. | ||
| /// | ||
| /// # Errors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since users can now provide a ParquetAccessPlan it is important to do validation on the contents.
While technically we could avoid doing this validation when the selections came from the page pruning, I think it would be a good check to have to catch future bugs rather than subtle wrong results so I chose to always validate
| // selectors from the second row group | ||
| RowSelector::select(5), | ||
| RowSelector::skip(7) | ||
| RowSelector::skip(7), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
turns out that some of the existing unit tests were actually invalid. However, I think the issues are actually test problem, not actual code problems. All the actual parquet reader tests passed
| let rg_metadata = file_metadata.row_groups(); | ||
| // track which row groups to actually read | ||
| let access_plan = ParquetAccessPlan::new_all(rg_metadata.len()); | ||
| let access_plan = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the required plumbing, which I am quite pleased with -- it is quite straightforward now
| let mut finder = MetricsFinder { metrics: None }; | ||
| accept(physical_plan.as_ref(), &mut finder).unwrap(); | ||
| let parquet_metrics = finder.metrics.unwrap(); | ||
| let parquet_metrics = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulled into a new file so I could reuse it
| /// | ||
| /// For a complete example, see the [`parquet_index_advanced` example]). | ||
| /// | ||
| /// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/parquet_index_advanced.rs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be added in #10701
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW #10701 (example of how to use this API) is ready for review
| } | ||
|
|
||
| // validate all Selections | ||
| for (idx, (rg, rg_meta)) in self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is new checking added as users can pass in ParquetAccessPlan and the semantics are quite subtle
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good explanation, makes sense to me coming in with no context on the original issue 👍
datafusion/core/src/datasource/physical_plan/parquet/access_plan.rs
Outdated
Show resolved
Hide resolved
Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com>
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Jefffrey for your review
|
BTW I am happy to make additional corrections as follow on PRs if anyone has additional notes cc @advancedxy @thinkharderdev @crepererum @NGA-TRAN and @Ted-Jiang @xinlifoobar and @hengfeiyang who reviewed the original PR to create |
advancedxy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alamb, this is great work. I think the code is pretty good.
Sorry for being late to the party, left some minor style issues comment.
| RowGroupAccess::Scan, | ||
| RowGroupAccess::Selection( | ||
| vec![RowSelector::select(5), RowSelector::skip(7)].into(), | ||
| // select / skip all 20 rows in row group 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: to be consistent with L427 in this file, it would be better to call it as
specifies all 20 rows in row group .
| fn test_invalid_too_few() { | ||
| let access_plan = ParquetAccessPlan::new(vec![ | ||
| RowGroupAccess::Scan, | ||
| // select 12 rows, but row group 1 has 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: -> specifies 12 rows?
I think the select is referred to as selection, which the following code also includes a skip.
| fn test_invalid_too_many() { | ||
| let access_plan = ParquetAccessPlan::new(vec![ | ||
| RowGroupAccess::Scan, | ||
| // select 22 rows, but row group 1 has only 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
| /// The `ParquetExec` will try and further reduce any provided | ||
| /// `ParquetAccessPlan` further based on the contents of `ParquetMetadata` and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: there are two further in this sentence. How about:
/// The `ParquetExec` will try and reduce any provided
/// `ParquetAccessPlan` further based on the contents ...
|
|
||
| // check row group count matches the plan | ||
| return Ok(access_plan.clone()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: is it better to add a logging in the else branch?
No worries -- thank you for the comments. I will make a PR to address them shortly |
|
PR with comments: #10896 |
… to `ParquetAccessPlan::select` (apache#10813) * Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add validation to ParquetAccessPlan::select * Add test for filtering and user supplied access plan * fix on windows * Apply suggestions from code review Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com> --------- Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com>
… to `ParquetAccessPlan::select` (apache#10813) (#8) * Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add validation to ParquetAccessPlan::select * Add test for filtering and user supplied access plan * fix on windows * Apply suggestions from code review --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com>
… to `ParquetAccessPlan::select` (apache#10813) * Allow `ParquetAccessPlan` to be passed in to `ParquetExec`, add validation to ParquetAccessPlan::select * Add test for filtering and user supplied access plan * fix on windows * Apply suggestions from code review Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com> --------- Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com>
… external index (#5556) related impl in datafusion for parquet format apache/datafusion#10813, see discussion in #5481 Signed-off-by: Huaijin <haohuaijin@gmail.com> --------- Signed-off-by: Huaijin <haohuaijin@gmail.com> Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
… external index (vortex-data#5556) related impl in datafusion for parquet format apache/datafusion#10813, see discussion in vortex-data#5481 Signed-off-by: Huaijin <haohuaijin@gmail.com> --------- Signed-off-by: Huaijin <haohuaijin@gmail.com> Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
This PR looks big but the 85% of it is tests and documentation
Which issue does this PR close?
Closes #9929
Rationale for this change
Many query engines / use cases have some sort of a specialized index for data stored in parquet. This index can be used to determine which row groups / selections within a file are needed
However, the DataFusion
ParquetExechas no way for users to pass this information in. Instead it tries to prune row groups based on the min/max statistics and other information in the file's metadata.This PR makes it possible for users to pass in a
ParquetAccessPlanadded in #10738 toParquetExecwith a starting plan, which is then further pruned based on the file's metadata.What changes are included in this PR?
ParquetAccessPlanfor eachPartitionedFileread byParquetExecParquetAccessPlannow that it can be specified by usersAre these changes tested?
Yes, new tests are added
Are there any user-facing changes?
a new API
Here is a complete end to end example of using this API: #10701