Support Parallel Order-Preserving Result Set Materialization#3700
Merged
Mytherin merged 33 commits intoduckdb:masterfrom May 25, 2022
Merged
Support Parallel Order-Preserving Result Set Materialization#3700Mytherin merged 33 commits intoduckdb:masterfrom
Mytherin merged 33 commits intoduckdb:masterfrom
Conversation
…uce batch index and add support for parallel limit
… of materialized results
hawkfish
reviewed
May 24, 2022
| CREATE TABLE other_table AS SELECT 337 i UNION ALL SELECT 948247 UNION ALL SELECT 17797934 UNION ALL SELECT 99999998 UNION ALL SELECT 99999999; | ||
| COPY (SELECT * FROM range(50000000) t(i)) TO '${BENCHMARK_DIR}/integers1.parquet'; | ||
| COPY (SELECT * FROM range(50000000, 100000000) t(i)) TO '${BENCHMARK_DIR}/integers2.parquet'; | ||
| CREATE VIEW integers AS SELECT * FROM '${BENCHMARK_DIR}/integers*.parquet'; |
Contributor
There was a problem hiding this comment.
Doesn't this rely on the order in which the globber returns the file names? My experience with Windows suggests that this can be fragile...
Collaborator
Author
There was a problem hiding this comment.
Yes that is an excellent point and one I already fixed in a test.
hawkfish
reviewed
May 24, 2022
hawkfish
approved these changes
May 24, 2022
1 task
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR adds support for parallel order-preserving result set materialization (fixes #2260, #2653, duckdb/duckdb-web#142).
This enables support full end-to-end parallelism, i.e. an entire query including result set materialization can now be executed fully in parallel. Previously the final pipeline (the pipeline materializing the result) was always executed in a single-threaded fashion. For most analytical queries this was not a problem, but for queries that involved large results or for queries that have a slow final pipeline this could become a significant bottleneck (see #2245).
Batch Indexes
The way the parallelism works in an order-preserving manner is that pipeline sources emit a "batch index". This is an index that relates to the current batch that is being processed. For example, in a regular table scan, you can imagine the batch index to be the row group index.
When assembling the result of a query, we can then use the fact that each thread processes data "in-order" to reconstruct the same result as single-threaded execution - it is only the batch indexes that might be out of sync between the different threads. This allows us to restore the original order without an additional sorting step - we can scan all the data in the result ordered by batch index.
This allows us not only to materialize result sets in parallel, but also perform operators that rely on maintaining order in parallel. For example, we can execute a
LIMIT nclause (without a corresponding ORDER clause) in parallel now by having each thread materializentuples, and selecting thentuples with the lowest batch indexes.In order for this to work, sources need to implement the batch_index method, which is provided as an additional method in the PhysicalOperator, as well as an additional method in the TableFunction API. This PR implements this for built-in table scans, parquet scans and Pandas DataFrame scans. If a source does not support the batch_index method, the single-threaded option is used instead.
Result Set Collectors
In order to facilitate parallel result set materialization, a new class is added: the
PhysicalResultCollector. This class is placed at the root of a query tree and functions as a regular sink does. However, instead of producing chunks again after its completion, it instead produces aQueryResult. When a materialized result is requested, the result collectors are used to construct the materialized query result.Note that streaming query results are not yet parallelized currently.
In addition, a custom result collector can be provided. This is not used yet, but should be used in the future to allow e.g. directly constructing Pandas DataFrames in parallel, without having to first go to a materialized query result.
Preserve Insertion Order
In addition to the changes above, this PR introduces a new setting
preserve_insertion_order. This setting defaults to true. When the setting is switched to false, the system will not care about preserving the insertion order and is free to re-order as much as the SQL standard allows. This means that e.g. a query with aLIMITbut without anORDER BYwill return non-deterministic results (i.e.SELECT * FROM tbl LIMIT 5will return any 5 rows fromtbl, and not necessarily the first 5 rows).BuildPipelines
As a general clean-up of operators, the
BuildPipelinesmethod has been converted from a single giant method in theExecutorto a method that can be overloaded by individual operators. This allows individual operators to more easily extend and modify how pipelines are constructed without having to do this all in one central place.Benchmark
Running the following query results in the following timings (previous versions of DuckDB would always run at the single-threaded timing since the final pipeline could not be parallelized):