Skip to content

ParallelInputsProcessor should be able to parallelize additional_input_at_end #5263

@gengliqi

Description

@gengliqi

Enhancement

Consider the TPC-H sql 13:

select
        c_custkey,
        count(o_orderkey) as c_count
from
        customer left outer join orders on
                c_custkey = o_custkey
group by
        c_custkey;

Execution plan:

+--------------------------------------------+------------+--------------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| id                                         | estRows    | task         | access object  | operator info                                                                                                                                      |
+--------------------------------------------+------------+--------------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| TableReader_61                             | 748800.00  | root         |                | data:ExchangeSender_60                                                                                                                             |
| └─ExchangeSender_60                        | 748800.00  | mpp[tiflash] |                | ExchangeType: PassThrough                                                                                                                          |
|   └─Projection_8                           | 748800.00  | mpp[tiflash] |                | sf10.customer.c_custkey, Column#18                                                                                                                 |
|     └─Projection_55                        | 748800.00  | mpp[tiflash] |                | Column#18, sf10.customer.c_custkey                                                                                                                 |
|       └─HashAgg_53                         | 748800.00  | mpp[tiflash] |                | group by:sf10.customer.c_custkey, funcs:count(sf10.orders.o_orderkey)->Column#18, funcs:firstrow(sf10.customer.c_custkey)->sf10.customer.c_custkey |
|         └─HashJoin_38                      | 7512019.23 | mpp[tiflash] |                | left outer join, equal:[eq(sf10.customer.c_custkey, sf10.orders.o_custkey)]                                                                        |
|           ├─ExchangeReceiver_24(Build)     | 750000.00  | mpp[tiflash] |                |                                                                                                                                                    |
|           │ └─ExchangeSender_23            | 750000.00  | mpp[tiflash] |                | ExchangeType: HashPartition, Hash Cols: [name: sf10.customer.c_custkey, collate: binary]                                                           |
|           │   └─TableFullScan_22           | 750000.00  | mpp[tiflash] | table:customer | keep order:false                                                                                                                                   |
|           └─ExchangeReceiver_27(Probe)     | 7500000.00 | mpp[tiflash] |                |                                                                                                                                                    |
|             └─ExchangeSender_26            | 7500000.00 | mpp[tiflash] |                | ExchangeType: HashPartition, Hash Cols: [name: sf10.orders.o_custkey, collate: binary]                                                             |
|               └─TableFullScan_25           | 7500000.00 | mpp[tiflash] | table:orders   | keep order:false                                                                                                                                   |
+--------------------------------------------+------------+--------------+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------+

The customer table is considered as hash table and the orders table is considered as probe table.

After the probe phase is done, due to left join, some rows that are not used in customer hash table should also be outputted, which is handled by NonJoinedBlockInputStream. Also, there are several NonJoinedBlockInputStream used to speed up this work.

The aggregation(count(o_orderkey)) is handled by ParallelAggregatingBlockInputStream. It uses a ParallelInputsProcessor to parallelize the aggregation operation.

Then we can describe what's the problem.

/// The last thread on the output indicates that there is no more data.
if (0 == --active_threads)
{
/// And then it processes an additional source, if there is one.
if (additional_input_at_end)
{
try
{
additional_input_at_end->readPrefix();
while (Block block = additional_input_at_end->read())
publishPayload(additional_input_at_end, block, thread_num);
}
catch (...)
{
exception = std::current_exception();
}
if (exception)
{
handler.onException(exception, thread_num);
}
}
handler.onFinish(); /// TODO If in `onFinish` or `onFinishThread` there is an exception, then std::terminate is called.
}

The additional_input_at_end is only one input stream and it is processed by the last thread to finish the task. It's typically used for NonJoinedBlockInputStream because it must be processed after all probe tasks are done.

/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
{
const Settings & settings = context.getSettingsRef();
BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log);
pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>(
pipeline.streams,
stream_with_non_joined_data,
params,
context.getFileProvider(),
true,
max_streams,
settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads),
log->identifier());
pipeline.streams.resize(1);
// should record for agg before restore concurrency. See #3804.
recordProfileStreams(pipeline, query_block.aggregation_name);
restorePipelineConcurrency(pipeline);
}

Several input streams(in the above case, is NonJoinedBlockInputStream) are combined into only one input stream.

So the data from several NonJoinedBlockInputStream can only be aggregated in one thread.
The larger the amount of data, the slower the speed.

Metadata

Metadata

Assignees

Labels

type/enhancementThe issue or PR belongs to an enhancement.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions