Skip to content

Support Parallel Order-Preserving Result Set Materialization#3700

Merged
Mytherin merged 33 commits intoduckdb:masterfrom
Mytherin:parallelinsertorderpreserving
May 25, 2022
Merged

Support Parallel Order-Preserving Result Set Materialization#3700
Mytherin merged 33 commits intoduckdb:masterfrom
Mytherin:parallelinsertorderpreserving

Conversation

@Mytherin
Copy link
Collaborator

@Mytherin Mytherin commented May 24, 2022

This PR adds support for parallel order-preserving result set materialization (fixes #2260, #2653, duckdb/duckdb-web#142).

This enables support full end-to-end parallelism, i.e. an entire query including result set materialization can now be executed fully in parallel. Previously the final pipeline (the pipeline materializing the result) was always executed in a single-threaded fashion. For most analytical queries this was not a problem, but for queries that involved large results or for queries that have a slow final pipeline this could become a significant bottleneck (see #2245).

Batch Indexes

The way the parallelism works in an order-preserving manner is that pipeline sources emit a "batch index". This is an index that relates to the current batch that is being processed. For example, in a regular table scan, you can imagine the batch index to be the row group index.

When assembling the result of a query, we can then use the fact that each thread processes data "in-order" to reconstruct the same result as single-threaded execution - it is only the batch indexes that might be out of sync between the different threads. This allows us to restore the original order without an additional sorting step - we can scan all the data in the result ordered by batch index.

This allows us not only to materialize result sets in parallel, but also perform operators that rely on maintaining order in parallel. For example, we can execute a LIMIT n clause (without a corresponding ORDER clause) in parallel now by having each thread materialize n tuples, and selecting the n tuples with the lowest batch indexes.

In order for this to work, sources need to implement the batch_index method, which is provided as an additional method in the PhysicalOperator, as well as an additional method in the TableFunction API. This PR implements this for built-in table scans, parquet scans and Pandas DataFrame scans. If a source does not support the batch_index method, the single-threaded option is used instead.

Result Set Collectors

In order to facilitate parallel result set materialization, a new class is added: the PhysicalResultCollector. This class is placed at the root of a query tree and functions as a regular sink does. However, instead of producing chunks again after its completion, it instead produces a QueryResult. When a materialized result is requested, the result collectors are used to construct the materialized query result.

Note that streaming query results are not yet parallelized currently.

In addition, a custom result collector can be provided. This is not used yet, but should be used in the future to allow e.g. directly constructing Pandas DataFrames in parallel, without having to first go to a materialized query result.

Preserve Insertion Order

In addition to the changes above, this PR introduces a new setting preserve_insertion_order. This setting defaults to true. When the setting is switched to false, the system will not care about preserving the insertion order and is free to re-order as much as the SQL standard allows. This means that e.g. a query with a LIMIT but without an ORDER BY will return non-deterministic results (i.e. SELECT * FROM tbl LIMIT 5 will return any 5 rows from tbl, and not necessarily the first 5 rows).

BuildPipelines

As a general clean-up of operators, the BuildPipelines method has been converted from a single giant method in the Executor to a method that can be overloaded by individual operators. This allows individual operators to more easily extend and modify how pipelines are constructed without having to do this all in one central place.

Benchmark

Running the following query results in the following timings (previous versions of DuckDB would always run at the single-threaded timing since the final pipeline could not be parallelized):

CREATE TABLE integers AS SELECT * FROM generate_series(0, 100000000, 1) tbl(i);
CREATE TABLE other_table AS SELECT 337 i UNION ALL SELECT 948247 UNION ALL SELECT 17797934;
SELECT * FROM integers WHERE i IN (SELECT * FROM other_table);
System Time (s)
DuckDB 1T 0.53s
DuckDB 8T 0.07s
PostgreSQL 5.60s

Mytherin added 30 commits May 12, 2022 11:25
…uce batch index and add support for parallel limit
CREATE TABLE other_table AS SELECT 337 i UNION ALL SELECT 948247 UNION ALL SELECT 17797934 UNION ALL SELECT 99999998 UNION ALL SELECT 99999999;
COPY (SELECT * FROM range(50000000) t(i)) TO '${BENCHMARK_DIR}/integers1.parquet';
COPY (SELECT * FROM range(50000000, 100000000) t(i)) TO '${BENCHMARK_DIR}/integers2.parquet';
CREATE VIEW integers AS SELECT * FROM '${BENCHMARK_DIR}/integers*.parquet';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this rely on the order in which the globber returns the file names? My experience with Windows suggests that this can be fragile...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is an excellent point and one I already fixed in a test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Parallel result set materialization

2 participants