Improve the performance of partition table in extreme case#4988
Improve the performance of partition table in extreme case#4988ti-chi-bot merged 31 commits intopingcap:masterfrom
Conversation
Signed-off-by: bestwoody <bestwoody@163.com>
|
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. DetailsReviewer can indicate their review by submitting an approval review. |
…into multiplex_dmsegstream
| streams_queue_id = 0; | ||
|
|
||
| auto & q_ptr = streams_queue_by_partition[streams_queue_id]; | ||
| auto & q = *q_ptr; |
There was a problem hiding this comment.
just make code easier to read&write... and to show it will not be a nullptr.
There was a problem hiding this comment.
oh, I figure out, since it's a std::shared_ptr, use dereference tech to avoid triger atomic counter of std::shared_ptr.
There was a problem hiding this comment.
Then why not merge L56 and L57 as
auto & q = *streams_queue_by_partition[stream_queue_id]?
| } | ||
| pipeline.streams.insert(pipeline.streams.end(), current_pipeline.streams.begin(), current_pipeline.streams.end()); | ||
| } | ||
| for (int i = 0; i < static_cast<int>(max_streams); i++) |
There was a problem hiding this comment.
I think we don't need to use MultiplexInputStream for non-partition table scan or partition table scan with only one partition.
There was a problem hiding this comment.
I agree with you, the only trouble is the code will be more dirty when two cases are not in a universal code path.
…into multiplex_dmsegstream
| /// calculate weighted max_streams for each partition, note at least 1 stream is needed for each partition | ||
| size_t current_max_streams = table_query_infos.size() == 1 ? max_streams : (max_streams * region_num + total_local_region_num - 1) / total_local_region_num; | ||
|
|
||
| size_t current_max_streams = max_streams; |
There was a problem hiding this comment.
Seems we don't need current_max_streams anymore, just use max_streams is enough.
| if (has_multiple_partitions) | ||
| { | ||
| String req_info = dag_context.isMPPTask() ? dag_context.getMPPTaskId().toString() : ""; | ||
| for (int i = 0; i < static_cast<int>(max_streams); ++i) |
There was a problem hiding this comment.
The stream number should be std::min(max_streams, the stream number in stream_pool)?
| streams_queue_id = 0; | ||
|
|
||
| auto & q_ptr = streams_queue_by_partition[streams_queue_id]; | ||
| auto & q = *q_ptr; |
There was a problem hiding this comment.
Then why not merge L56 and L57 as
auto & q = *streams_queue_by_partition[stream_queue_id]?
…into multiplex_dmsegstream
| std::make_shared<std::queue<std::shared_ptr<IBlockInputStream>>>()); | ||
| for (const auto & stream : cur_streams) | ||
| streams_queue_by_partition.back()->push(stream); |
There was a problem hiding this comment.
nit: you could construct the queue before entering the critical area.
auto queue = std::make_shared<std::queue<BlockInputStreamPtr>>(cur_streams.begin(), cur_streams.end());
std::unique_lock lk(mu);| if (queue_id + 1 < static_cast<int>(streams_queue_by_partition.size())) | ||
| return queue_id + 1; | ||
| else | ||
| return 0; |
There was a problem hiding this comment.
| if (queue_id + 1 < static_cast<int>(streams_queue_by_partition.size())) | |
| return queue_id + 1; | |
| else | |
| return 0; | |
| return (queue_id + 1) % streams_queue_by_partition.size(); |
| std::shared_ptr<std::queue< | ||
| std::shared_ptr<IBlockInputStream>>>> | ||
| streams_queue_by_partition; | ||
| std::vector<std::shared_ptr<IBlockInputStream>> added_streams; |
There was a problem hiding this comment.
nit: use BlockInputStreamPtr other than std::shared_ptr<IBlockInputStream> .
| std::shared_ptr<std::queue< | ||
| std::shared_ptr<IBlockInputStream>>>> |
There was a problem hiding this comment.
nit: according to ClickHouse's coding style, better to add an alias for std::queue<std::shared_ptr<IBlockInputStream>> like:
using BlockInputStreamPtrQueue = std::queue<std::shared_ptr<IBlockInputStream>>;Co-authored-by: Fu Zhe <fuzhe1989@gmail.com>
|
/merge |
|
@bestwoody: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
|
This pull request has been accepted and is ready to merge. DetailsCommit hash: b90332f |
Coverage for changed filesCoverage summaryfull coverage report (for internal network access only) |
What problem does this PR solve?
Issue Number: close #4474
Problem Summary:
Improve the performance of partition table in extreme case
What is changed and how it works?
use Multiplex technique to balancely read underlying DMSegmentInputstream to avoid consume starvation.
design doc: https://pingcap.feishu.cn/wiki/wikcntp2B8GEXyFLSbIbAtMJzBG?useNewLarklet=1
benchmark result: https://pingcap.feishu.cn/wiki/wikcntp2B8GEXyFLSbIbAtMJzBG?useNewLarklet=1#0fWIDd
Check List
Tests
Side effects
Documentation
Release note