-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add ability to specify external sort information for ListingTables #4170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
👍 |
|
Yes, if the parquet files are already sorted, we should expose the info to the optimizer. |
b753cc7 to
df4209c
Compare
| /// parquet metadata. | ||
| /// | ||
| /// See <https://github.com/apache/arrow-datafusion/issues/4177> | ||
| pub file_sort_order: Option<Vec<Expr>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I was working on this feature, it occurs to me that if the sort order is to be maintained, the ListingTable can't put multiple file into the same partition 🤔
The ListingTable does not attempt to read all files concurrently instead it will read files in sequence within a partition. This is an important property as it allows plans to run against 1000s of files and not try to open them all concurrently.
However, it means if we assign more than one file to a partitition the output sort order will not be preserved as illustrated in this diagrams
When only 1 file is assigned to each partition, each partition is correctly sorted on (A, B, C)
┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
┃ ┌───────────────┐ ┌──────────────┐ │ ┌──────────────┐ │ ┌─────────────┐ ┃
│ │ 1.parquet │ │ │ │ 2.parquet │ │ │ 3.parquet │ │ │ 4.parquet │ │
┃ │ Sort: A, B, C │ │Sort: A, B, C │ │ │Sort: A, B, C │ │ │Sort: A, B, C│ ┃
│ └───────────────┘ │ │ └──────────────┘ │ └──────────────┘ │ └─────────────┘ │
┃ │ │ ┃
│ │ │ │ │ │
┃ │ │ ┃
│ │ │ │ │ │
┃ │ │ ┃
│ │ │ │ │ │
┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃
DataFusion DataFusion DataFusion DataFusion
┃ Partition 1 Partition 2 Partition 3 Partition 4 ┃
━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
ParquetExec
When more than 1 file is assigned to each partition, each partition is NOT correctly sorted on (A, B, C). Once the second file is scanned, the same values for A, B and C can be repeated in the same sorted stream
┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃
┃ ┌───────────────┐ ┌──────────────┐ │
│ │ 1.parquet │ │ │ │ 2.parquet │ ┃
┃ │ Sort: A, B, C │ │Sort: A, B, C │ │
│ └───────────────┘ │ │ └──────────────┘ ┃
┃ ┌───────────────┐ ┌──────────────┐ │
│ │ 3.parquet │ │ │ │ 4.parquet │ ┃
┃ │ Sort: A, B, C │ │Sort: A, B, C │ │
│ └───────────────┘ │ │ └──────────────┘ ┃
┃ │
│ │ │ ┃
┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
DataFusion DataFusion ┃
┃ Partition 1 Partition 2
━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛
ParquetExec
Does anyone have a preference between any of the possibilities:
- Adding a check in DataFusion so that sorts will be ignored if there are more than one file in each partition?
- Making more partitions if a sort order is specified
- Making more partitions up to a point (maybe 2x the target partitions and if not ignore the sort order)?
This affects IOx, see https://github.com/influxdata/influxdb_iox/issues/6125
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 after thinking about this over the evening I think I like option 1 (ignore provided sort order if there is more than one file in each partition) the best
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer option (1) as well.
In future, we can implement a new SortPreservingParquetExec which can read multiple pre-sorted parquet files in a partition and leverage the efficient merge sort to keep the sort ordering. And during physical planning time, if the parent plan do needs the sort info, the physical planner can choose the SortPreservingParquetExec instead of normal ParquetExec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I don't see the logic to ignore the sort info if there are more than one files in each partition in this PR.
let (partitioned_file_lists, statistics) =
self.list_files_for_scan(ctx, filters, limit).await?;
// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection.as_ref())?;
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct -- I will add the logic to ignore sort order to this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
|
@mingmwang if you have some time to review and comment on the design in this PR I would be appreciative |
| format!("Expected Expr::Sort in output_ordering, but got {:?}", expr) | ||
| )) | ||
| } | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we return Error to make the plan failed or just log some error info and return Ok(None) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally prefer returning an error here as it would signify some bug in the specification of the sort expression that should be fixed by the caller -- but I can also see the rationale for being more forgiving.
If/When we start reading sort order from the parquet files themselves then I think warn! and continuing without error would definitely be a better behavior
We could also make some setting that controlled the behavior (error or ignore). Would that be better?
|
LGTM. |
|
Thanks @mingmwang -- I am going to merge this one in as it is blocking me downstream and I will address any other follow on comments in other PRs. Thank you |
|
Benchmark runs are scheduled for baseline = d72b4f0 and contender = d2814c9. d2814c9 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4169
Rationale for this change
Our parquet files are sorted but the new BasicEnforcement rule is trying to resort them as it doesn't know they are sorted. See more details in #4169
I initially made this PR very specific to Parquet, but have since made it cover all ListingTables as suggested by @Dandandan on #4169 (comment) as the sort order information is generalizable over formats as CSV/JSON/Avro could be sorted as well.
What changes are included in this PR?
Are these changes tested?
Yes
Are there any user-facing changes?