Skip to content

Conversation

@alamb
Copy link
Contributor

@alamb alamb commented Nov 10, 2022

Which issue does this PR close?

Closes #4169

Rationale for this change

Our parquet files are sorted but the new BasicEnforcement rule is trying to resort them as it doesn't know they are sorted. See more details in #4169

I initially made this PR very specific to Parquet, but have since made it cover all ListingTables as suggested by @Dandandan on #4169 (comment) as the sort order information is generalizable over formats as CSV/JSON/Avro could be sorted as well.

What changes are included in this PR?

  1. Add a way to specify an externally known sort order to ParquetExec
  2. Tests that the basic enforcement rule follows them

Are these changes tested?

Yes

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Nov 10, 2022
@mingmwang
Copy link
Contributor

👍

@mingmwang
Copy link
Contributor

Yes, if the parquet files are already sorted, we should expose the info to the optimizer.

@alamb alamb changed the title Add ability to specify external sort information for ParquetExec Add ability to specify external sort information for ListingTables Nov 11, 2022
@alamb alamb force-pushed the alamb/parquet_sorts branch from b753cc7 to df4209c Compare November 11, 2022 17:11
/// parquet metadata.
///
/// See <https://github.com/apache/arrow-datafusion/issues/4177>
pub file_sort_order: Option<Vec<Expr>>,
Copy link
Contributor Author

@alamb alamb Nov 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I was working on this feature, it occurs to me that if the sort order is to be maintained, the ListingTable can't put multiple file into the same partition 🤔

The ListingTable does not attempt to read all files concurrently instead it will read files in sequence within a partition. This is an important property as it allows plans to run against 1000s of files and not try to open them all concurrently.

However, it means if we assign more than one file to a partitition the output sort order will not be preserved as illustrated in this diagrams

When only 1 file is assigned to each partition, each partition is correctly sorted on (A, B, C)

┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┓
  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ┐
┃   ┌───────────────┐     ┌──────────────┐ │   ┌──────────────┐ │   ┌─────────────┐   ┃
  │ │   1.parquet   │ │ │ │  2.parquet   │   │ │  3.parquet   │   │ │  4.parquet  │ │
┃   │ Sort: A, B, C │     │Sort: A, B, C │ │   │Sort: A, B, C │ │   │Sort: A, B, C│   ┃
  │ └───────────────┘ │ │ └──────────────┘   │ └──────────────┘   │ └─────────────┘ │
┃                                          │                    │                     ┃
  │                   │ │                    │                    │                 │
┃                                          │                    │                     ┃
  │                   │ │                    │                    │                 │
┃                                          │                    │                     ┃
  │                   │ │                    │                    │                 │
┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
     DataFusion           DataFusion           DataFusion           DataFusion
┃    Partition 1          Partition 2          Partition 3          Partition 4       ┃
 ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━

                                      ParquetExec

When more than 1 file is assigned to each partition, each partition is NOT correctly sorted on (A, B, C). Once the second file is scanned, the same values for A, B and C can be repeated in the same sorted stream

┏ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━
  ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
┃   ┌───────────────┐     ┌──────────────┐ │
  │ │   1.parquet   │ │ │ │  2.parquet   │   ┃
┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
  │ └───────────────┘ │ │ └──────────────┘   ┃
┃   ┌───────────────┐     ┌──────────────┐ │
  │ │   3.parquet   │ │ │ │  4.parquet   │   ┃
┃   │ Sort: A, B, C │     │Sort: A, B, C │ │
  │ └───────────────┘ │ │ └──────────────┘   ┃
┃                                          │
  │                   │ │                    ┃
┃  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
     DataFusion           DataFusion         ┃
┃    Partition 1          Partition 2
 ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ━ ┛

              ParquetExec

Does anyone have a preference between any of the possibilities:

  1. Adding a check in DataFusion so that sorts will be ignored if there are more than one file in each partition?
  2. Making more partitions if a sort order is specified
  3. Making more partitions up to a point (maybe 2x the target partitions and if not ignore the sort order)?

This affects IOx, see https://github.com/influxdata/influxdb_iox/issues/6125

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 after thinking about this over the evening I think I like option 1 (ignore provided sort order if there is more than one file in each partition) the best

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer option (1) as well.
In future, we can implement a new SortPreservingParquetExec which can read multiple pre-sorted parquet files in a partition and leverage the efficient merge sort to keep the sort ordering. And during physical planning time, if the parent plan do needs the sort info, the physical planner can choose the SortPreservingParquetExec instead of normal ParquetExec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I don't see the logic to ignore the sort info if there are more than one files in each partition in this PR.

let (partitioned_file_lists, statistics) =
            self.list_files_for_scan(ctx, filters, limit).await?;

        // if no files need to be read, return an `EmptyExec`
        if partitioned_file_lists.is_empty() {
            let schema = self.schema();
            let projected_schema = project_schema(&schema, projection.as_ref())?;
            return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct -- I will add the logic to ignore sort order to this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@alamb alamb marked this pull request as ready for review November 11, 2022 19:22
@alamb alamb requested review from Dandandan and tustvold November 11, 2022 19:22
@alamb
Copy link
Contributor Author

alamb commented Nov 11, 2022

@mingmwang if you have some time to review and comment on the design in this PR I would be appreciative

format!("Expected Expr::Sort in output_ordering, but got {:?}", expr)
))
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return Error to make the plan failed or just log some error info and return Ok(None) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally prefer returning an error here as it would signify some bug in the specification of the sort expression that should be fixed by the caller -- but I can also see the rationale for being more forgiving.

If/When we start reading sort order from the parquet files themselves then I think warn! and continuing without error would definitely be a better behavior

We could also make some setting that controlled the behavior (error or ignore). Would that be better?

@alamb alamb marked this pull request as draft November 14, 2022 12:11
@alamb alamb marked this pull request as ready for review November 14, 2022 12:53
@mingmwang
Copy link
Contributor

LGTM.

@alamb
Copy link
Contributor Author

alamb commented Nov 14, 2022

Thanks @mingmwang -- I am going to merge this one in as it is blocking me downstream and I will address any other follow on comments in other PRs. Thank you

@alamb alamb merged commit d2814c9 into apache:master Nov 14, 2022
@alamb alamb deleted the alamb/parquet_sorts branch November 14, 2022 17:12
@ursabot
Copy link

ursabot commented Nov 14, 2022

Benchmark runs are scheduled for baseline = d72b4f0 and contender = d2814c9. d2814c9 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add ability to specify external sort information for ParquetExec

3 participants