-
Notifications
You must be signed in to change notification settings - Fork 8.3k
Distributing Workloads More Evenly Between Parquet Pipes #65963
Description
Describe the situation
I'm testing the read performance of Parquet files using 26 files on a machine with 64 cores. Each file contains a varying number of row groups, ranging from 1 to 9. These files were generated using Trino from the TPC-DS 100 store_sales table.
How to reproduce
- ClickHouse version 24.7.1.702
SELECT
num_row_groups,
num_rows
FROM file('./store_sales/*', ParquetMetaData)
FORMAT TSV
1 553238
1 604702
1 411712
1 25732
1 411712
1 411712
1 411712
6 21376859
1 257320
1 141526
1 411712
9 29035346
1 411712
1 585403
1 411712
9 28845572
9 28771593
1 467656
9 28826273
1 527506
9 29073943
1 411712
9 28961365
9 28729778
9 28903470
9 29016046
If I run the test with the default settings, it takes approximately 7 seconds to complete.
SELECT
ss_sold_date_sk,
ss_customer_sk,
ss_hdemo_sk,
ss_addr_sk,
ss_store_sk,
ss_ticket_number,
ss_ext_sales_price,
ss_ext_list_price,
ss_ext_tax
FROM file('./store_sales/*', Parquet)
WHERE (ss_sold_date_sk >= 2451180) AND (ss_sold_date_sk <= 2452246) AND (ss_store_sk >= 2) AND (ss_store_sk <= 401) AND (ss_hdemo_sk >= 480) AND (ss_hdemo_sk <= 6599)
FORMAT `Null`
Query id: 46f28761-8370-4095-80ba-3d38154376f1
Ok.
0 rows in set. Elapsed: 7.704 sec. Processed 163.76 million rows, 6.02 GB (21.26 million rows/s., 781.96 MB/s.)
However, if I increase the max_parsing_threads to 64 or 512, the query speed increases significantly.
SELECT
ss_sold_date_sk,
ss_customer_sk,
ss_hdemo_sk,
ss_addr_sk,
ss_store_sk,
ss_ticket_number,
ss_ext_sales_price,
ss_ext_list_price,
ss_ext_tax
FROM file('./store_sales/*', Parquet)
WHERE (ss_sold_date_sk >= 2451180) AND (ss_sold_date_sk <= 2452246) AND (ss_store_sk >= 2) AND (ss_store_sk <= 401) AND (ss_hdemo_sk >= 480) AND (ss_hdemo_sk <= 6599)
FORMAT `Null`
SETTINGS max_parsing_threads = 64
Query id: 254f7a90-5640-4ff0-8d59-e361043e96f3
Ok.
0 rows in set. Elapsed: 4.145 sec. Processed 163.37 million rows, 6.01 GB (39.42 million rows/s., 1.45 GB/s.)
SELECT
ss_sold_date_sk,
ss_customer_sk,
ss_hdemo_sk,
ss_addr_sk,
ss_store_sk,
ss_ticket_number,
ss_ext_sales_price,
ss_ext_list_price,
ss_ext_tax
FROM file('./store_sales/*', Parquet)
WHERE (ss_sold_date_sk >= 2451180) AND (ss_sold_date_sk <= 2452246) AND (ss_store_sk >= 2) AND (ss_store_sk <= 401) AND (ss_hdemo_sk >= 480) AND (ss_hdemo_sk <= 6599)
FORMAT `Null`
SETTINGS max_parsing_threads = 512
Query id: d1e4ef00-d343-483f-9272-8acfb2a0c73f
Ok.
0 rows in set. Elapsed: 1.709 sec. Processed 163.76 million rows, 6.02 GB (95.83 million rows/s., 3.53 GB/s.)
The settings here control the number of threads each Parquet pipe can use. However, each file contains a different number of row groups, and the row groups read by each Parquet pipe will vary.
ClickHouse/src/Storages/StorageFile.cpp
Line 1395 in 2d16ab0
| const auto max_parsing_threads = std::max<size_t>(settings.max_parsing_threads / file_num, 1UL); |
The number of pipes is limited by the number of files,
ClickHouse/src/Storages/StorageFile.cpp
Line 1636 in 2d16ab0
| if (max_num_streams > files_to_read) |
Proposal
Always use num_threads pipes and create a read pool similar to a MergeTreeReadPool. Each pipe retrieves new tasks from the reading pool. So that we don't need to create extra thread pools for each pipe and ensures a more balanced workload distribution between the pipes.