Skip to content

Exchange Operator Enhancement #6620

@solotzg

Description

@solotzg

Exchange Operator Enhancement

Investigation

For now, exchange sender & receiver can occupy huge network bandwidth if there is lots of data to be shuffled(like tpch q9, q5, q18, .etc). A typical server ethernet profile is like

Ethernet
  - Speed: 10000Mb/s
  - Duplex: Full
  - upper bound:
    - net/recv 1250MB
    - net/send 1250MB

Memory: DDR4, 2133 MT/s ≈ 17064 MB/s

The limited network transfer speed may become bottleneck easily and make other operator after exchange receiver wait. The resource usage about CPU will be quit low. To accelerate exchange operator, an optional way is to use data compression.

Other Product

Starrocks

Set compression type according to query options. If transmission_compression_type is not set, use compress_rowbatches to check if compress transmitted data. Default value of compress_rowbatches is TRUE. Default compress method is LZ4.

Clickhouse

  • No exchange between instances

Presto

Page is the data structure which holds data and is transferred between Presto physical execution operators: upstream operator produces output through getOutput().
Just like Block, Page also needs to serialized & deserialized, serialization happens when data needs to transferred between workers. When Page is serialized, it will first encode the Blocks using corresponding BlockEncoding, and then if a compressor is available, it will try to compress the encoded block data. If compression works well(have encoding rate lower than 0.8), it will use compressed data, otherwise uncompressed data is used. The encoded block data will be put into a class named SerializedPage along with some statistic information: the byte size of the page before & after compression.

Exchange Operator

Env

  • TPCH-100
  • TiFlash x 3
  • TIFLASH REPLICA x 3
  NONE LZ4 ZSTD
Total Exchange Size(GB) 267.186 117.836 63.269
Total Exchange Size By NET (GB) 178.124 78.557 42.179
  • Approximate Compress Codec Throughput Per CPU Core:
    • NONE
      • Input: 2169.3 MB/s
      • Output: 2169.3 MB/s
    • LZ4
      • Input: 210.9 MB/s
      • Output: 78.0 MB/s
      • lz4-compression-ratio ≈ 2.267
    • ZSTD
      • Input: 126.0 MB/s
      • Output: 18.9 MB/s
      • zstd-compression-ratio ≈ 4.223
  • Expected Performance Improvement About TPCH-100 By Compression Method:
    • 3 x TiFlash: 9.17%(LZ4)
    • More TiFlash stores, better performance improvement
    • Average CPU usage is higher than the original way
  • Expected NET/IO Reduce TPCH-100 By Compression Method:
    • 3 x TiFlash:
      • LZ4: 99.567 GB (178.124-78.557)
      • ZSTD: 135.945 GB (178.124-42.179)
    • Approximate 56.00%(LZ4), 76.67%(ZSTD)

Implementation

TiDB

New session vars

  • mpp_version: the mpp-version used to build mpp plan, if mpp-version is unspecified, use the latest version.
    • options: -1 (unspecified), 0, 1.
      • mpp_version is initialized to -1.
      • if set to -1, tidb will choose the latest version (1)
      • if set to 0, data compression mode will be set NONE
      • if version is 1, planner will choose data compression mode in exchange opator
  • mpp_exchange_compression_mode: used to select data compression method in mpp exchange operator
    • options: NONE, FAST, HIGH_COMPRESSION, UNSPECIFIED
      • mpp_exchange_compression_mode is initialized to UNSPECIFIED.
      • UNSPECIFIED: tidb will choose the default mode (FAST)
      • NONE: no compression
      • FAST: fast compression/decompression speed, compression ratio is lower than HC mode.
      • HIGH_COMPRESSION: high compression (HC) ratio mode

New proto

kvproto: pingcap/kvproto#1027

  • add mpp_version in TaskMeta, IsAliveResponse, Error
  • add version in MPPDataPacket to indicate data packet format

tipb: pingcap/tipb#285

  • add CompressionMode compression in ExchangeSender

New fields

mpp info will be shown in explain result

  • MppVersion: $ in TableReader whose data is from ExchangeSender if it's a mpp plan.
  • Compression: $ in ExchangeSender if compression mode is not NONE
MySQL > set mpp_version=1; set mpp_exchange_compression_mode=fast; explain select count(*) as count_order from lineitem group by l_returnflag, l_linestatus order by l_returnflag, l_linestatus;

+----------------------------------------+--------------+--------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id                                     | estRows      | task         | access object  | operator info                                                                                                                                                                                                                                                                                                |
+----------------------------------------+--------------+--------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection_6                           | 3.00         | root         |                | Column#18                                                                                                                                                                                                                                                                                                    |
| └─Sort_8                               | 3.00         | root         |                | tpch100_new.lineitem.l_returnflag, tpch100_new.lineitem.l_linestatus                                                                                                                                                                                                                                         |
|   └─TableReader_36                     | 3.00         | root         |                | MppVersion: 1, data:ExchangeSender_35                                                                                                                                                                                                                                                                        |
|     └─ExchangeSender_35                | 3.00         | mpp[tiflash] |                | ExchangeType: PassThrough                                                                                                                                                                                                                                                                                    |
|       └─Projection_31                  | 3.00         | mpp[tiflash] |                | Column#18, tpch100_new.lineitem.l_returnflag, tpch100_new.lineitem.l_linestatus                                                                                                                                                                                                                              |
|         └─HashAgg_32                   | 3.00         | mpp[tiflash] |                | group by:tpch100_new.lineitem.l_linestatus, tpch100_new.lineitem.l_returnflag, funcs:sum(Column#23)->Column#18, funcs:firstrow(tpch100_new.lineitem.l_returnflag)->tpch100_new.lineitem.l_returnflag, funcs:firstrow(tpch100_new.lineitem.l_linestatus)->tpch100_new.lineitem.l_linestatus, stream_count: 20 |
|           └─ExchangeReceiver_34        | 3.00         | mpp[tiflash] |                | stream_count: 20                                                                                                                                                                                                                                                                                             |
|             └─ExchangeSender_33        | 3.00         | mpp[tiflash] |                | ExchangeType: HashPartition, Compression: FAST, Hash Cols: [name: tpch100_new.lineitem.l_returnflag, collate: utf8mb4_bin], [name: tpch100_new.lineitem.l_linestatus, collate: utf8mb4_bin], stream_count: 20                                                                                                |
|               └─HashAgg_14             | 3.00         | mpp[tiflash] |                | group by:tpch100_new.lineitem.l_linestatus, tpch100_new.lineitem.l_returnflag, funcs:count(1)->Column#23                                                                                                                                                                                                     |
|                 └─TableFullScan_30     | 600037902.00 | mpp[tiflash] | table:lineitem | keep order:false                                                                                                                                                                                                                                                                                             |
+----------------------------------------+--------------+--------------+----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Planner Change

  • latest mpp-version is 1, if we want to change data structure(like new data codec pattern), it must be updated.
  • mpp-version will be added to each mpp related structure.
  • only if mpp-version is 1, planner will build exchange sander with compression mode
    • if session var about compression mode is unspecified, tidb chooses FAST mode.
  • tidb send mpp task to tiflash, tiflash should check whether mpp-version is valid.
  • if ExchangeType is NOT HashPartition, compression mode will always be NONE. We need to support compression in other exchange type if necessary.

TiFlash

Base Modules & Utils

  • add -march=haswell flag for a few modules
  • add avx2_byte_count to optimize size_t countBytesInFilter(const UInt8 * filt, size_t sz).
  • optimize a few implementations for ColumnsCommon.cpp, DataTypeString.cpp,
  • use std::chrono::steady_clock instead of system_clock in a few modules

Mpp Version Check

TiFlash will check mpp-version in FlashService::EstablishMPPConnection, FlashService::CancelMPPTask, FlashService::EstablishMPPConnection.

if failed to check mpp-version is valid, tiflash will return grpc error grpc::StatusCode::CANCELLED with message like:

ERROR 1105 (HY000): rpc error: code = Canceled desc = Failed to handling mpp dispatch request, reason=`Invalid mpp version -1, TiFlash expects version: min 0, max 1, should upgrade TiDB/planner`

Hash Partition Writer

            // compression method flag; NONE = 0x02, LZ4 = 0x82, ZSTD = 0x90
            // ... 
            // header meta:
            //     columns count;
            //     total row count (multi parts);
            //     for each column:
            //         column name;
            //         column type;
            // for each part:
            //     row count;
            //     columns data;
  • HashPartitionWriterV1 supports compression mode:
    • NONE: no compression
      • if target tunnel model is TunnelSenderMode::LOCAL, must use method NONE.
    • FAST: fast compression/decompression speed, use method LZ4.
    • HIGH_COMPRESSION: high compression (HC) ratio mode, use method ZSTD.
  • NewMPPExchangeWriter will generate HashPartitionWriterV1 when exchange type is tipb::ExchangeType::Hash
  • mpp-version of dag context is not 0.
  • ExchangeReceiverBase<RPCContext>::decodeChunks will check version of mpp::MPPDataPacket
    • MPPDataPacketV0: use previous way.
    • MPPDataPacketV1: use new format. check first byte(compression method flag) to determine if data is compressed.

Grafana

Add panel Exchange Bytes/Seconds in Grafana

  • hash_none_compression_remote
  • hash_none_compression_local
  • hash_lz4_compression
  • hash_zstd_compression
  • hash_original: original data size sent by hash exchange
    • hash_original ≈ hash_none_compression_remote + hash_none_compression_local + hash_lz4_compression * lz4-compression-ratio + hash_zstd_compression * zstd-compression-ratio
  • broadcast_passthrough_original
  • broadcast_passthrough_none_compression_local
  • broadcast_passthrough_none_compression_remote

image

New config fields

[profiles]
[profiles.default]
batch_send_min_limit_compression = -1 # default minimal chunk size of exchanging data among TiFlash when using data compression

If batch_send_min_limit_compression LT 0, HashPartitionWriterV1 will use 8192 * partition_num.

Progress

Benchmark

	Supported ports: [ FIBRE ]
	Supported link modes:   1000baseT/Full
	                        10000baseT/Full
	Supported pause frame use: Symmetric Receive-only
	Supports auto-negotiation: No
	Supported FEC modes: Not reported
	Advertised link modes:  10000baseT/Full
	Advertised pause frame use: No
	Advertised auto-negotiation: No
	Advertised FEC modes: Not reported
	Speed: 10000Mb/s
	Duplex: Full
	Port: FIBRE
	PHYAD: 1
	Transceiver: internal
	Auto-negotiation: off
	Supports Wake-on: g
	Wake-on: d
	Current message level: 0x00000000 (0)

	Link detected: yes
  • CPU
Architecture:          x86_64
CPU op-mode(s):        32-bit, 64-bit
Byte Order:            Little Endian
CPU(s):                40
On-line CPU(s) list:   0-39
Thread(s) per core:    2
Core(s) per socket:    10
Socket(s):             2
NUMA node(s):          2
Vendor ID:             GenuineIntel
CPU family:            6
Model:                 79
Model name:            Intel(R) Xeon(R) CPU E5-2630 v4 @ 2.20GHz
Stepping:              1
CPU MHz:               2402.355
CPU max MHz:           3100.0000
CPU min MHz:           1200.0000
BogoMIPS:              4399.86
Virtualization:        VT-x
L1d cache:             32K
L1i cache:             32K
L2 cache:              256K
L3 cache:              25600K
NUMA node0 CPU(s):     0,2,4,6,8,10,12,14,16,18,20,22,24,26,28,30,32,34,36,38
NUMA node1 CPU(s):     1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31,33,35,37,39
Flags:                 fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc aperfmperf eagerfpu pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid dca sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch epb cat_l3 cdp_l3 invpcid_single intel_pt ssbd ibrs ibpb stibp tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm cqm rdt_a rdseed adx smap xsaveopt cqm_llc cqm_occup_llc cqm_mbm_total cqm_mbm_local dtherm ida arat pln pts spec_ctrl intel_stibp flush_l1d
  • Performamce
Time(s) NONE FAST HIGH_COMPRESSION Improvement: (Original) / (FAST) - 1.0 Improvement: (Original) / (HIGH_COMPRESSION) - 1.0
Q1 2.85 2.85 2.92 0.00% -2.40%
Q2 1.31 1.17 1.38 11.97% -5.07%
Q3 3.46 3.19 3.32 8.46% 4.22%
Q4 2.65 2.45 2.65 8.16% 0.00%
Q5 8.09 6.48 7.48 24.85% 8.16%
Q6 0.7 0.7 0.7 0.00% 0.00%
Q7 3.25 2.92 3.19 11.30% 1.88%
Q8 5.87 5.2 5.27 12.88% 11.39%
Q9 22.38 18.09 20.23 23.71% 10.63%
Q10 4.33 3.93 4.19 10.18% 3.34%
Q11 1.11 0.84 0.91 32.14% 21.98%
Q12 1.71 1.58 1.64 8.23% 4.27%
Q13 3.52 3.25 3.39 8.31% 3.83%
Q14 0.77 0.77 0.91 0.00% -15.38%
Q15 1.91 1.64 1.71 16.46% 11.70%
Q16 1.04 0.91 0.97 14.29% 7.22%
Q17 6.95 6.68 6.81 4.04% 2.06%
Q18 8.62 7.28 7.68 18.41% 12.24%
Q19 1.85 1.85 1.91 0.00% -3.14%
Q20 1.44 1.44 1.44 0.00% 0.00%
Q21 9.7 9.36 9.83 3.63% -1.32%
Q22 0.57 0.57 0.64 0.00% -10.94%
SUM 94.08 83.15 89.17 13.14% 5.51%
  • Net I/O Reduction
  NONE FAST HIGH_COMPRESSION NET/IO Reduction:(NONE-FAST)/NONE NET/IO Reduction:(NONE-HIGH_COMPRESSION)/NONE
Total Exchange Size By NET (GB) 178.124 78.557 42.179 55.90% 76.32%

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions