-
Notifications
You must be signed in to change notification settings - Fork 411
Description
Exchange Operator Enhancement
Investigation
For now, exchange sender & receiver can occupy huge network bandwidth if there is lots of data to be shuffled(like tpch q9, q5, q18, .etc). A typical server ethernet profile is like
Ethernet
- Speed: 10000Mb/s
- Duplex: Full
- upper bound:
- net/recv 1250MB
- net/send 1250MB
Memory: DDR4, 2133 MT/s ≈ 17064 MB/s
The limited network transfer speed may become bottleneck easily and make other operator after exchange receiver wait. The resource usage about CPU will be quit low. To accelerate exchange operator, an optional way is to use data compression.
Other Product
Starrocks
Set compression type according to query options. If transmission_compression_type is not set, use compress_rowbatches to check if compress transmitted data. Default value of compress_rowbatches is TRUE. Default compress method is LZ4.
- compress_rowbatches: TRUE
- The boolean value to control if to compress the row batches in RPCs between BEs. This configuration item is used for the data transmission between query layers. The value true indicates to compress the row batches. The value false indicates not to compress the row batches.
- CONF_mDouble(rpc_compress_ratio_threshold, "1.1");
- Compress ratio when shuffle row_batches in network, not in storage engine.If ratio is less than this value, use uncompressed data instead.
- exchange_sink_operator.cpp#L381-L388 / ExchangeSinkOperator::prepare
- LZ4 compress method
- exchange_sink_operator.cpp#L687-L693
- if compress_ratio > config::rpc_compress_ratio_threshold
- dst->set_compress_type(_compress_type);
- if compress_ratio > config::rpc_compress_ratio_threshold
- data_stream_sender.cpp#L420-L432 / DataStreamSender::prepare
Clickhouse
- No exchange between instances
Presto
Page is the data structure which holds data and is transferred between Presto physical execution operators: upstream operator produces output through getOutput().
Just like Block, Page also needs to serialized & deserialized, serialization happens when data needs to transferred between workers. When Page is serialized, it will first encode the Blocks using corresponding BlockEncoding, and then if a compressor is available, it will try to compress the encoded block data. If compression works well(have encoding rate lower than 0.8), it will use compressed data, otherwise uncompressed data is used. The encoded block data will be put into a class named SerializedPage along with some statistic information: the byte size of the page before & after compression.
Exchange Operator
Env
- TPCH-100
- TiFlash x 3
- TIFLASH REPLICA x 3
| NONE | LZ4 | ZSTD | |
|---|---|---|---|
| Total Exchange Size(GB) | 267.186 | 117.836 | 63.269 |
| Total Exchange Size By NET (GB) | 178.124 | 78.557 | 42.179 |
- Approximate Compress Codec Throughput Per CPU Core:
- NONE
- Input: 2169.3 MB/s
- Output: 2169.3 MB/s
- LZ4
- Input: 210.9 MB/s
- Output: 78.0 MB/s
- lz4-compression-ratio ≈ 2.267
- ZSTD
- Input: 126.0 MB/s
- Output: 18.9 MB/s
- zstd-compression-ratio ≈ 4.223
- NONE
- Expected Performance Improvement About TPCH-100 By Compression Method:
- 3 x TiFlash: 9.17%(LZ4)
- More TiFlash stores, better performance improvement
- Average CPU usage is higher than the original way
- Expected NET/IO Reduce TPCH-100 By Compression Method:
- 3 x TiFlash:
- LZ4: 99.567 GB (178.124-78.557)
- ZSTD: 135.945 GB (178.124-42.179)
- Approximate 56.00%(LZ4), 76.67%(ZSTD)
- 3 x TiFlash:
Implementation
TiDB
New session vars
mpp_version: the mpp-version used to build mpp plan, if mpp-version is unspecified, use the latest version.- options: -1 (unspecified), 0, 1.
- mpp_version is initialized to
-1. - if set to -1, tidb will choose the latest version (1)
- if set to 0, data compression mode will be set NONE
- if version is 1, planner will choose data compression mode in exchange opator
- mpp_version is initialized to
- options: -1 (unspecified), 0, 1.
mpp_exchange_compression_mode: used to select data compression method in mpp exchange operator- options: NONE, FAST, HIGH_COMPRESSION, UNSPECIFIED
- mpp_exchange_compression_mode is initialized to
UNSPECIFIED. - UNSPECIFIED: tidb will choose the default mode (FAST)
- NONE: no compression
- FAST: fast compression/decompression speed, compression ratio is lower than HC mode.
- HIGH_COMPRESSION: high compression (HC) ratio mode
- mpp_exchange_compression_mode is initialized to
- options: NONE, FAST, HIGH_COMPRESSION, UNSPECIFIED
New proto
kvproto: pingcap/kvproto#1027
- add
mpp_versionin TaskMeta, IsAliveResponse, Error - add
versionin MPPDataPacket to indicate data packet format
tipb: pingcap/tipb#285
- add
CompressionMode compressioninExchangeSender
New fields
mpp info will be shown in explain result
MppVersion: $inTableReaderwhose data is fromExchangeSenderif it's a mpp plan.Compression: $inExchangeSenderif compression mode is not NONE
MySQL > set mpp_version=1; set mpp_exchange_compression_mode=fast; explain select count(*) as count_order from lineitem group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus;
+----------------------------------------+--------------+--------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id | estRows | task | access object | operator info |
+----------------------------------------+--------------+--------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection_6 | 3.00 | root | | Column#18 |
| └─Sort_8 | 3.00 | root | | tpch100_new.lineitem.l_returnflag, tpch100_new.lineitem.l_linestatus |
| └─TableReader_36 | 3.00 | root | | MppVersion: 1, data:ExchangeSender_35 |
| └─ExchangeSender_35 | 3.00 | mpp[tiflash] | | ExchangeType: PassThrough |
| └─Projection_31 | 3.00 | mpp[tiflash] | | Column#18, tpch100_new.lineitem.l_returnflag, tpch100_new.lineitem.l_linestatus |
| └─HashAgg_32 | 3.00 | mpp[tiflash] | | group by:tpch100_new.lineitem.l_linestatus, tpch100_new.lineitem.l_returnflag, funcs:sum(Column#23)->Column#18, funcs:firstrow(tpch100_new.lineitem.l_returnflag)->tpch100_new.lineitem.l_returnflag, funcs:firstrow(tpch100_new.lineitem.l_linestatus)->tpch100_new.lineitem.l_linestatus, stream_count: 20 |
| └─ExchangeReceiver_34 | 3.00 | mpp[tiflash] | | stream_count: 20 |
| └─ExchangeSender_33 | 3.00 | mpp[tiflash] | | ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: tpch100_new.lineitem.l_returnflag, collate: utf8mb4_bin], [name: tpch100_new.lineitem.l_linestatus, collate: utf8mb4_bin], stream_count: 20 |
| └─HashAgg_14 | 3.00 | mpp[tiflash] | | group by:tpch100_new.lineitem.l_linestatus, tpch100_new.lineitem.l_returnflag, funcs:count(1)->Column#23 |
| └─TableFullScan_30 | 600037902.00 | mpp[tiflash] | table:lineitem | keep order:false |
+----------------------------------------+--------------+--------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Planner Change
- latest mpp-version is
1, if we want to change data structure(like new data codec pattern), it must be updated. - mpp-version will be added to each mpp related structure.
- only if mpp-version is 1, planner will build exchange sander with compression mode
- if session var about compression mode is unspecified, tidb chooses FAST mode.
- tidb send mpp task to tiflash, tiflash should check whether mpp-version is valid.
- if
ExchangeTypeis NOTHashPartition, compression mode will always be NONE. We need to support compression in other exchange type if necessary.
TiFlash
Base Modules & Utils
- add
-march=haswellflag for a few modules - add
avx2_byte_countto optimizesize_t countBytesInFilter(const UInt8 * filt, size_t sz). - optimize a few implementations for ColumnsCommon.cpp, DataTypeString.cpp,
- use
std::chrono::steady_clockinstead ofsystem_clockin a few modules
Mpp Version Check
TiFlash will check mpp-version in FlashService::EstablishMPPConnection, FlashService::CancelMPPTask, FlashService::EstablishMPPConnection.
if failed to check mpp-version is valid, tiflash will return grpc error grpc::StatusCode::CANCELLED with message like:
ERROR 1105 (HY000): rpc error: code = Canceled desc = Failed to handling mpp dispatch request, reason=`Invalid mpp version -1, TiFlash expects version: min 0, max 1, should upgrade TiDB/planner`
Hash Partition Writer
- add new module
HashPartitionWriterV1with new codec pattern- create packet with version MPPDataPacketV1 (1) https://github.com/pingcap/kvproto/blob/7cd28226c2a21e489b077a87e1f5c9ba2c950944/proto/mpp.proto#L76
- data packet chunk format
// compression method flag; NONE = 0x02, LZ4 = 0x82, ZSTD = 0x90
// ...
// header meta:
// columns count;
// total row count (multi parts);
// for each column:
// column name;
// column type;
// for each part:
// row count;
// columns data;
HashPartitionWriterV1supports compression mode:- NONE: no compression
- if target tunnel model is
TunnelSenderMode::LOCAL, must use method NONE.
- if target tunnel model is
- FAST: fast compression/decompression speed, use method
LZ4. - HIGH_COMPRESSION: high compression (HC) ratio mode, use method
ZSTD.
- NONE: no compression
NewMPPExchangeWriterwill generateHashPartitionWriterV1when exchange type istipb::ExchangeType::Hash- mpp-version of dag context is not 0.
ExchangeReceiverBase<RPCContext>::decodeChunkswill check version ofmpp::MPPDataPacket- MPPDataPacketV0: use previous way.
- MPPDataPacketV1: use new format. check first byte(compression method flag) to determine if data is compressed.
Grafana
Add panel Exchange Bytes/Seconds in Grafana
- hash_none_compression_remote
- hash_none_compression_local
- hash_lz4_compression
- hash_zstd_compression
- hash_original: original data size sent by hash exchange
- hash_original ≈ hash_none_compression_remote + hash_none_compression_local + hash_lz4_compression * lz4-compression-ratio + hash_zstd_compression * zstd-compression-ratio
- broadcast_passthrough_original
- broadcast_passthrough_none_compression_local
- broadcast_passthrough_none_compression_remote
New config fields
[profiles]
[profiles.default]
batch_send_min_limit_compression = -1 # default minimal chunk size of exchanging data among TiFlash when using data compression
If batch_send_min_limit_compression LT 0, HashPartitionWriterV1 will use 8192 * partition_num.
Progress
- planner: add MppVersion for mpp task; support data compression in Exchange Operator; tidb#40132
- mpp: add mpp versoin and data packet version kvproto#1027
- executor: exchange operator supports data compression tipb#285
- Optimize exchange operator & Support mpp version #6596
- Performance Regression Compared With release-6.5-20230107-v6.5.0 #6713
- update system-variables.md: add mpp_version and mpp_exchange_compres.. docs#12387
Benchmark
- TPCH-100
- TiFlash x 3
- TIFLASH REPLICA x 3
- Base branch:
- TiDB: f1a744a3e6da9bb29781efc5bd504be5cc0ba55c
- TiFash: 584c992
- Memory: 188.66 GB
- Ethernet
Supported ports: [ FIBRE ]
Supported link modes: 1000baseT/Full
10000baseT/Full
Supported pause frame use: Symmetric Receive-only
Supports auto-negotiation: No
Supported FEC modes: Not reported
Advertised link modes: 10000baseT/Full
Advertised pause frame use: No
Advertised auto-negotiation: No
Advertised FEC modes: Not reported
Speed: 10000Mb/s
Duplex: Full
Port: FIBRE
PHYAD: 1
Transceiver: internal
Auto-negotiation: off
Supports Wake-on: g
Wake-on: d
Current message level: 0x00000000 (0)
Link detected: yes
- CPU
Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 40
On-line CPU(s) list: 0-39
Thread(s) per core: 2
Core(s) per socket: 10
Socket(s): 2
NUMA node(s): 2
Vendor ID: GenuineIntel
CPU family: 6
Model: 79
Model name: Intel(R) Xeon(R) CPU E5-2630 v4 @ 2.20GHz
Stepping: 1
CPU MHz: 2402.355
CPU max MHz: 3100.0000
CPU min MHz: 1200.0000
BogoMIPS: 4399.86
Virtualization: VT-x
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 25600K
NUMA node0 CPU(s): 0,2,4,6,8,10,12,14,16,18,20,22,24,26,28,30,32,34,36,38
NUMA node1 CPU(s): 1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31,33,35,37,39
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid dca sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch epb cat_l3 cdp_l3 invpcid_single intel_pt ssbd ibrs ibpb stibp tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm cqm rdt_a rdseed adx smap xsaveopt cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local dtherm ida arat pln pts spec_ctrl intel_stibp flush_l1d
- Performamce
| Time(s) | NONE | FAST | HIGH_COMPRESSION | Improvement: (Original) / (FAST) - 1.0 | Improvement: (Original) / (HIGH_COMPRESSION) - 1.0 |
|---|---|---|---|---|---|
| Q1 | 2.85 | 2.85 | 2.92 | 0.00% | -2.40% |
| Q2 | 1.31 | 1.17 | 1.38 | 11.97% | -5.07% |
| Q3 | 3.46 | 3.19 | 3.32 | 8.46% | 4.22% |
| Q4 | 2.65 | 2.45 | 2.65 | 8.16% | 0.00% |
| Q5 | 8.09 | 6.48 | 7.48 | 24.85% | 8.16% |
| Q6 | 0.7 | 0.7 | 0.7 | 0.00% | 0.00% |
| Q7 | 3.25 | 2.92 | 3.19 | 11.30% | 1.88% |
| Q8 | 5.87 | 5.2 | 5.27 | 12.88% | 11.39% |
| Q9 | 22.38 | 18.09 | 20.23 | 23.71% | 10.63% |
| Q10 | 4.33 | 3.93 | 4.19 | 10.18% | 3.34% |
| Q11 | 1.11 | 0.84 | 0.91 | 32.14% | 21.98% |
| Q12 | 1.71 | 1.58 | 1.64 | 8.23% | 4.27% |
| Q13 | 3.52 | 3.25 | 3.39 | 8.31% | 3.83% |
| Q14 | 0.77 | 0.77 | 0.91 | 0.00% | -15.38% |
| Q15 | 1.91 | 1.64 | 1.71 | 16.46% | 11.70% |
| Q16 | 1.04 | 0.91 | 0.97 | 14.29% | 7.22% |
| Q17 | 6.95 | 6.68 | 6.81 | 4.04% | 2.06% |
| Q18 | 8.62 | 7.28 | 7.68 | 18.41% | 12.24% |
| Q19 | 1.85 | 1.85 | 1.91 | 0.00% | -3.14% |
| Q20 | 1.44 | 1.44 | 1.44 | 0.00% | 0.00% |
| Q21 | 9.7 | 9.36 | 9.83 | 3.63% | -1.32% |
| Q22 | 0.57 | 0.57 | 0.64 | 0.00% | -10.94% |
| SUM | 94.08 | 83.15 | 89.17 | 13.14% | 5.51% |
- Net I/O Reduction
| NONE | FAST | HIGH_COMPRESSION | NET/IO Reduction:(NONE-FAST)/NONE | NET/IO Reduction:(NONE-HIGH_COMPRESSION)/NONE | |
|---|---|---|---|---|---|
| Total Exchange Size By NET (GB) | 178.124 | 78.557 | 42.179 | 55.90% | 76.32% |
