Feature Branch: merge master to planner_refactory branch#5353
Feature Branch: merge master to planner_refactory branch#5353ti-chi-bot merged 105 commits intopingcap:planner_refactoryfrom
planner_refactory branch#5353Conversation
* Add to_seconds support for tiflash Signed-off-by: yibin <huyibin@pingcap.com> * Fix format issue Signed-off-by: yibin <huyibin@pingcap.com> * Add mutex lock to protect async reciever async reader Signed-off-by: yibin <huyibin@pingcap.com> * Fix a rebase issue Signed-off-by: yibin <huyibin@pingcap.com> * Change raw pointer to unique ptr Signed-off-by: yibin <huyibin@pingcap.com> * Fix format issue Signed-off-by: yibin <huyibin@pingcap.com> Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
…gionKVStoreTest*'` (pingcap#4903) close pingcap#4904
|
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. DetailsReviewer can indicate their review by submitting an approval review. |
planner_refactory branch
|
/rebuild |
| /// todo support fine grained shuffle | ||
| static auto disable_fine_frained_shuffle = [](const DAGQueryBlock & query_block) { | ||
| return !enableFineGrainedShuffle(query_block.source->fine_grained_shuffle_stream_count()) | ||
| && (!query_block.exchange_sender || !enableFineGrainedShuffle(query_block.exchange_sender->fine_grained_shuffle_stream_count())); | ||
| }; | ||
| return query_block.source | ||
| && (query_block.source->tp() == tipb::ExecType::TypeProjection | ||
| || query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver); | ||
| || query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver) | ||
| && disable_fine_frained_shuffle(query_block); |
| for (size_t i = 0; i < max_streams; ++i) | ||
| { | ||
| BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(mpp_exchange_receiver, log->identifier(), executor_id); | ||
| BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(mpp_exchange_receiver, log->identifier(), executor_id, /*stream_id=*/0); |
| /// todo support fine grained shuffle | ||
| int stream_id = 0; | ||
| pipeline.transform([&](auto & stream) { | ||
| // construct writer | ||
| std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr>>( | ||
| std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr, false>>( | ||
| dag_context.tunnel_set, | ||
| partition_col_ids, | ||
| partition_col_collators, | ||
| exchange_type, | ||
| context.getSettingsRef().dag_records_per_chunk, | ||
| context.getSettingsRef().batch_send_min_limit, | ||
| stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response | ||
| dag_context); | ||
| dag_context, | ||
| 0, | ||
| 0); |
| } | ||
| CATCH | ||
|
|
||
| /// todo support FineGrainedShuffle |
| if (pipeline.streams.size() > 1 || pipeline.streams_with_non_joined_data.size() > 1) | ||
| { | ||
| const Settings & settings = context.getSettingsRef(); | ||
| BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); | ||
| pipeline.firstStream() = std::make_shared<ParallelAggregatingBlockInputStream>( | ||
| BlockInputStreamPtr stream = std::make_shared<ParallelAggregatingBlockInputStream>( | ||
| pipeline.streams, | ||
| stream_with_non_joined_data, | ||
| pipeline.streams_with_non_joined_data, | ||
| params, | ||
| context.getFileProvider(), | ||
| true, | ||
| max_streams, | ||
| settings.aggregation_memory_efficient_merge_threads ? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads) : static_cast<size_t>(settings.max_threads), | ||
| log->identifier()); | ||
|
|
||
| pipeline.streams.resize(1); | ||
| pipeline.streams_with_non_joined_data.clear(); | ||
| pipeline.firstStream() = std::move(stream); | ||
|
|
||
| // should record for agg before restore concurrency. See #3804. | ||
| recordProfileStreams(pipeline, context); | ||
| restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log); | ||
| } | ||
| else | ||
| { | ||
| BlockInputStreamPtr stream_with_non_joined_data = combinedNonJoinedDataStream(pipeline, max_streams, log); | ||
| BlockInputStreams inputs; | ||
| if (!pipeline.streams.empty()) | ||
| inputs.push_back(pipeline.firstStream()); | ||
| else | ||
| pipeline.streams.resize(1); | ||
| if (stream_with_non_joined_data) | ||
| inputs.push_back(stream_with_non_joined_data); | ||
|
|
||
| if (!pipeline.streams_with_non_joined_data.empty()) | ||
| inputs.push_back(pipeline.streams_with_non_joined_data.at(0)); | ||
|
|
||
| pipeline.streams.resize(1); | ||
| pipeline.streams_with_non_joined_data.clear(); | ||
|
|
|
/rebuild |
|
/merge |
|
@SeaRise: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
|
This pull request has been accepted and is ready to merge. DetailsCommit hash: b8a6178 |
|
/rebuild |
|
/run-integration-test |
Coverage for changed filesCoverage summaryfull coverage report (for internal network access only) |
|
/run-integration-test |
6 similar comments
|
/run-integration-test |
|
/run-integration-test |
|
/run-integration-test |
|
/run-integration-test |
|
/run-integration-test |
|
/run-integration-test |
What problem does this PR solve?
Issue Number: ref #4739
close #5350
Problem Summary:
What is changed and how it works?
We can use #5350 to check if the modification is correct
Check List
Tests
Side effects
Documentation
Release note