-
Notifications
You must be signed in to change notification settings - Fork 411
Description
Enhancement
Consider the TPC-H sql 13:
select
c_custkey,
count(o_orderkey) as c_count
from
customer left outer join orders on
c_custkey = o_custkey
group by
c_custkey;Execution plan:
+--------------------------------------------+------------+--------------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| id | estRows | task | access object | operator info |
+--------------------------------------------+------------+--------------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| TableReader_61 | 748800.00 | root | | data:ExchangeSender_60 |
| └─ExchangeSender_60 | 748800.00 | mpp[tiflash] | | ExchangeType: PassThrough |
| └─Projection_8 | 748800.00 | mpp[tiflash] | | sf10.customer.c_custkey, Column#18 |
| └─Projection_55 | 748800.00 | mpp[tiflash] | | Column#18, sf10.customer.c_custkey |
| └─HashAgg_53 | 748800.00 | mpp[tiflash] | | group by:sf10.customer.c_custkey, funcs:count(sf10.orders.o_orderkey)->Column#18, funcs:firstrow(sf10.customer.c_custkey)->sf10.customer.c_custkey |
| └─HashJoin_38 | 7512019.23 | mpp[tiflash] | | left outer join, equal:[eq(sf10.customer.c_custkey, sf10.orders.o_custkey)] |
| ├─ExchangeReceiver_24(Build) | 750000.00 | mpp[tiflash] | | |
| │ └─ExchangeSender_23 | 750000.00 | mpp[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: sf10.customer.c_custkey, collate: binary] |
| │ └─TableFullScan_22 | 750000.00 | mpp[tiflash] | table:customer | keep order:false |
| └─ExchangeReceiver_27(Probe) | 7500000.00 | mpp[tiflash] | | |
| └─ExchangeSender_26 | 7500000.00 | mpp[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: sf10.orders.o_custkey, collate: binary] |
| └─TableFullScan_25 | 7500000.00 | mpp[tiflash] | table:orders | keep order:false |
+--------------------------------------------+------------+--------------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------+The customer table is considered as hash table and the orders table is considered as probe table.
After the probe phase is done, due to left join, some rows that are not used in customer hash table should also be outputted, which is handled by NonJoinedBlockInputStream. Also, there are several NonJoinedBlockInputStream used to speed up this work.
The aggregation(count(o_orderkey)) is handled by ParallelAggregatingBlockInputStream. It uses a ParallelInputsProcessor to parallelize the aggregation operation.
Then we can describe what's the problem.
tiflash/dbms/src/DataStreams/ParallelInputsProcessor.h
Lines 243 to 267 in cbe6ab5
| /// The last thread on the output indicates that there is no more data. | |
| if (0 == --active_threads) | |
| { | |
| /// And then it processes an additional source, if there is one. | |
| if (additional_input_at_end) | |
| { | |
| try | |
| { | |
| additional_input_at_end->readPrefix(); | |
| while (Block block = additional_input_at_end->read()) | |
| publishPayload(additional_input_at_end, block, thread_num); | |
| } | |
| catch (...) | |
| { | |
| exception = std::current_exception(); | |
| } | |
| if (exception) | |
| { | |
| handler.onException(exception, thread_num); | |
| } | |
| } | |
| handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called. | |
| } |
The additional_input_at_end is only one input stream and it is processed by the last thread to finish the task. It's typically used for NonJoinedBlockInputStream because it must be processed after all probe tasks are done.
tiflash/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Lines 385 to 403 in cbe6ab5
| /// If there are several sources, then we perform parallel aggregation | |
| if (pipeline.streams.size() > 1) | |
| { | |
| const Settings & settings = context.getSettingsRef(); | |
| BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); | |
| pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>( | |
| pipeline.streams, | |
| stream_with_non_joined_data, | |
| params, | |
| context.getFileProvider(), | |
| true, | |
| max_streams, | |
| settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads), | |
| log->identifier()); | |
| pipeline.streams.resize(1); | |
| // should record for agg before restore concurrency. See #3804. | |
| recordProfileStreams(pipeline, query_block.aggregation_name); | |
| restorePipelineConcurrency(pipeline); | |
| } |
Several input streams(in the above case, is NonJoinedBlockInputStream) are combined into only one input stream.
So the data from several NonJoinedBlockInputStream can only be aggregated in one thread.
The larger the amount of data, the slower the speed.