Optimize data size of Broadcast / Passthrough exchange operator#6880
Optimize data size of Broadcast / Passthrough exchange operator#6880ti-chi-bot merged 47 commits intopingcap:masterfrom
Broadcast / Passthrough exchange operator#6880Conversation
Signed-off-by: Zhigao Tong <tongzhigao@pingcap.com>
|
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. DetailsReviewer can indicate their review by submitting an approval review. |
|
/run-integration-test |
| assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter"); | ||
| } | ||
|
|
||
| if (exchange_type == tipb::ExchangeType::Broadcast) |
There was a problem hiding this comment.
how about using template?
template <class ExchangeWriterPtr, bool is_broadcast>
class BroadcastOrPassThroughWriter : public DAGResponseWriter
There was a problem hiding this comment.
It's also an optional way.
| if (!expected_types.empty()) | ||
| { | ||
| for (auto && block : blocks) | ||
| assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter"); | ||
| } |
There was a problem hiding this comment.
how about
| if (!expected_types.empty()) | |
| { | |
| for (auto && block : blocks) | |
| assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter"); | |
| } | |
| #ifndef NDEBUG | |
| if (!expected_types.empty()) | |
| { | |
| for (auto && block : blocks) | |
| assertBlockSchema(expected_types, block, "BroadcastOrPassThroughWriter"); | |
| } | |
| #endif |
There was a problem hiding this comment.
assertBlockSchema is always neccesary when using compression because compression codec process will not check the expected types.
| auto remote_tunnel_tracked_packet = local_tunnel_cnt == tunnel_cnt ? nullptr : ori_tracked_packet; | ||
|
|
||
| if (compression_method != CompressionMethod::NONE) | ||
| remote_tunnel_tracked_packet = MPPTunnelSetHelper::ToCompressedPacket(ori_tracked_packet, version, compression_method); |
There was a problem hiding this comment.
| remote_tunnel_tracked_packet = MPPTunnelSetHelper::ToCompressedPacket(ori_tracked_packet, version, compression_method); | |
| remote_tunnel_tracked_packet = MPPTunnelSetHelper::ToCompressedPacket(remote_tunnel_tracked_packet, version, compression_method); |
There was a problem hiding this comment.
remote_tunnel_tracked_packet may be null if local_tunnel_cnt == tunnel_cnt. But I will refine this part and make it clear.
| if (isLocalTunnel(i)) | ||
| { | ||
| GET_METRIC(tiflash_exchange_data_bytes, type_hash_none_compression_local).Increment(sz); | ||
| local_cnt++; |
There was a problem hiding this comment.
| local_cnt++; | |
| ++local_cnt; |
There was a problem hiding this comment.
It's also an optional way, but there is no different for modern compiler.
| else | ||
| { | ||
| GET_METRIC(tiflash_exchange_data_bytes, type_hash_none_compression_remote).Increment(sz); | ||
| remote_cnt++; |
There was a problem hiding this comment.
| remote_cnt++; | |
| ++remote_cnt; |
| static void broadcastOrPassThroughWriteImpl( | ||
| const size_t tunnel_cnt, | ||
| const size_t local_tunnel_cnt, // can be 0 for PassThrough writer | ||
| size_t ori_packet_bytes, // original data packet size |
There was a problem hiding this comment.
It seems unnecessary to pass ori_packet_bytes.
Why not auto local_tracked_packet_bytes = local_tracked_packet? local_tracked_packet->getPacket().ByteSizeLong() : 0;?
There was a problem hiding this comment.
ori_packet_bytes is used for present uncompressed size of data packet. Especially when local_tunnel_cnt is 0 and local_tracked_packet is NULL. It may occur when using Passthrough writer.
| void updatePartitionWriterMetrics(CompressionMethod method, size_t original_size, size_t sz, bool is_local) | ||
| template <bool is_broadcast, typename FuncIsLocalTunnel, typename FuncWriteToTunnel> | ||
| static void broadcastOrPassThroughWriteImpl( | ||
| const size_t tunnel_cnt, |
There was a problem hiding this comment.
Pass remote_tunnel_cnt looks more symmetrical.
static void broadcastOrPassThroughWriteImpl(
const size_t local_tunnel_cnt,
const size_t remote_tunnel_cnt,
TrackedMppDataPacketPtr && local_tracked_packet,
TrackedMppDataPacketPtr && remote_tracked_packet,
There was a problem hiding this comment.
It's also an optional way.
|
/merge |
|
@solotzg: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
|
This pull request has been accepted and is ready to merge. DetailsCommit hash: 3b778c3 |
|
/merge |
|
@solotzg: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
|
This pull request has been accepted and is ready to merge. DetailsCommit hash: ccb115c |
|
/unhold |
What problem does this PR solve?
Issue Number: ref #7084
after #6596
What is changed and how it works?
StorageDisaggregated, use latest mpp version for task meta.Benchmark
ENV
sender.CompressionModefor all ExchangeTypeTest Broadcast Exchange With Compression
Make broadcast exchange become bottleneck forcily
tidb_broadcast_join_threshold_count: 10240 -> 1024000tidb_broadcast_join_threshold_size: 104857600 -> 10485760000Check List
Tests
Side effects
Documentation
Release note