-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Improvement](sink) optimization for parallel result sink #36667
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
## Proposed changes optimization for parallel result sink apache#36053
|
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
|
|
||
| template <bool is_binary_format> | ||
| Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) { | ||
| Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
^Additional context
be/src/vec/sink/vmysql_result_writer.cpp:104: 107 lines including whitespace and comments (threshold 80)
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
^| } | ||
|
|
||
| Status VIcebergTableWriter::write(vectorized::Block& block) { | ||
| Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' has cognitive complexity of 85 (threshold 50) [readability-function-cognitive-complexity]
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^Additional context
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:109: +1, including nesting penalty of 0, nesting level increased to 1
if (block.rows() == 0) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:113: +1, including nesting penalty of 0, nesting level increased to 1
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:113: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:120: +1, including nesting penalty of 0, nesting level increased to 1
if (_iceberg_partition_columns.empty()) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:125: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:128: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:132: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:132: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:133: +1, nesting level increased to 2
} else {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:134: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::iceberg_sink_max_file_size) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:145: +4, including nesting penalty of 3, nesting level increased to 4
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:149: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:149: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:150: +1, nesting level increased to 3
} else {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:157: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(writer->write(output_block));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:157: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->write(output_block));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:168: +1, including nesting penalty of 0, nesting level increased to 1
for (int i = 0; i < output_block.rows(); ++i) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:172: +2, including nesting penalty of 1, nesting level increased to 2
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:179: +2, including nesting penalty of 1, nesting level increased to 2
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:183: nesting level increased to 2
[&](const std::string& partition_name, int position,
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:189: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:189: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:195: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:202: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:204: +3, including nesting penalty of 2, nesting level increased to 3
if (_partitions_to_writers.size() + 1 >
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:210: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:210: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:211: +1, nesting level increased to 2
} else {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:213: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::iceberg_sink_max_file_size) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:222: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:222: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:224: +1, nesting level increased to 3
} else {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:228: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_pos_iter == writer_positions.end()) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:232: +1, nesting level increased to 3
} else {
^| } | ||
|
|
||
| Status VIcebergTableWriter::write(vectorized::Block& block) { | ||
| Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^Additional context
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:107: 139 lines including whitespace and comments (threshold 80)
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^| } | ||
|
|
||
| Status VHiveTableWriter::write(vectorized::Block& block) { | ||
| Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' has cognitive complexity of 83 (threshold 50) [readability-function-cognitive-complexity]
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^Additional context
be/src/vec/sink/writer/vhive_table_writer.cpp:86: +1, including nesting penalty of 0, nesting level increased to 1
if (block.rows() == 0) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:90: +1, including nesting penalty of 0, nesting level increased to 1
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:90: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:98: +1, including nesting penalty of 0, nesting level increased to 1
if (_partition_columns_input_index.empty()) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:103: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:106: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:110: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:110: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:111: +1, nesting level increased to 2
} else {
^be/src/vec/sink/writer/vhive_table_writer.cpp:112: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:123: +4, including nesting penalty of 3, nesting level increased to 4
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:127: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:127: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:128: +1, nesting level increased to 3
} else {
^be/src/vec/sink/writer/vhive_table_writer.cpp:135: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(writer->write(output_block));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:135: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->write(output_block));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:141: +1, including nesting penalty of 0, nesting level increased to 1
for (int i = 0; i < output_block.rows(); ++i) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:145: +2, including nesting penalty of 1, nesting level increased to 2
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:152: nesting level increased to 2
[&](const std::string& partition_name, int position,
^be/src/vec/sink/writer/vhive_table_writer.cpp:158: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:158: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:164: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:171: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:173: +3, including nesting penalty of 2, nesting level increased to 3
if (_partitions_to_writers.size() + 1 >
^be/src/vec/sink/writer/vhive_table_writer.cpp:179: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:179: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:180: +1, nesting level increased to 2
} else {
^be/src/vec/sink/writer/vhive_table_writer.cpp:182: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:191: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:191: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:193: +1, nesting level increased to 3
} else {
^be/src/vec/sink/writer/vhive_table_writer.cpp:197: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_pos_iter == writer_positions.end()) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:201: +1, nesting level increased to 3
} else {
^| } | ||
|
|
||
| Status VHiveTableWriter::write(vectorized::Block& block) { | ||
| Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^Additional context
be/src/vec/sink/writer/vhive_table_writer.cpp:83: 132 lines including whitespace and comments (threshold 80)
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
Proposed changes
#36305 #36628 reverted coz some bug about local exchange, this pr do not change local exchange now