Limit memory usage of exchange sender#10387
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR limits memory usage of ExchangeSender by adding a max_buffered_bytes parameter to control buffering across all writer types. The root cause of high memory usage was unlimited buffering based only on row counts, which could consume excessive memory with large rows and high thread counts.
Key changes:
- Adds
max_buffered_bytesparameter to all exchange writer constructors and functions - Implements
needFlush()method that checks both byte and row limits for flushing decisions - Introduces
getMaxBufferedBytesInResponseWriter()utility function to calculate appropriate buffer limits
Reviewed Changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| PhysicalExchangeSender.cpp | Adds concurrency calculation and max_buffered_bytes parameter to writer creation |
| newMPPExchangeWriter.h/.cpp | Adds max_buffered_bytes parameter to function signatures and passes it through |
| HashPartitionWriter.h/.cpp | Refactors to use base class buffering fields and byte-based flushing |
| FineGrainedShuffleWriter.h/.cpp | Similar refactoring to use unified buffering approach |
| BroadcastOrPassThroughWriter.h/.cpp | Updates to use base class buffering and byte limits |
| StreamingDAGResponseWriter.h/.cpp | Refactors buffering logic to use base class implementation |
| DAGResponseWriter.h | Adds base buffering fields and needFlush() method |
| DAGUtils.h/.cpp | Adds utility function to calculate max buffered bytes |
| DAGDriver.cpp | Updates streaming writer creation with new parameter |
| Test files | Updates all test cases to include new max_buffered_bytes parameter |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| if (scattered[col_id][chunk_id]->valuesHaveFixedSize()) | ||
| { | ||
| // Reserve space for each chunk to avoid frequent memory allocation. | ||
| scattered[col_id][chunk_id]->reserve(1024); | ||
| } |
There was a problem hiding this comment.
The hardcoded value 1024 should be defined as a named constant to improve maintainability and make the purpose clearer.
|
/test pull-unit-test |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: gengliqi, solotzg The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/test pull-unit-test |
3e85f09 to
6d89003
Compare
|
/hold |
|
/test pull-integration-next-gen |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
220960d to
5ca77f3
Compare
|
/hold cancel |
|
/test pull-unit-test |
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
In response to a cherrypick label: new pull request created to branch |
What problem does this PR solve?
Issue Number: close #10337
Problem Summary:
The root cause is in current implementation, the buffered block number in ExchangeSender can be upto max_threads * max_threads * fine_grain_stream_count, so if max_threads is large, and the size of each row is big, the buffered block can consume very large memory.
This pr limit the memory usage of ExchangeSender by
max(MAX_BATCH_SEND_MIN_LIMIT_MEM_SIZE * max_threads, max_buffered_bytes_in_executor)What is changed and how it works?
Check List
Tests
test in 2-tiflash's cluster, with max_threads = 32 and sort memory limit set to 5G
Where
testhas40000000rows, each row has ~1200 bytes.Before this pr
peak memory of each TiFlash is about 30G
After this pr
peak memory of each TiFlash is about 10G
Side effects
Documentation
Release note