-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Improvement](sink) optimization for parallel result sink #36305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
run buildall |
|
clang-tidy review says "All clean, LGTM! 👍" |
|
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
|
|
||
| template <bool is_binary_format> | ||
| Status VMysqlResultWriter<is_binary_format>::write(Block& input_block) { | ||
| Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
^Additional context
be/src/vec/sink/vmysql_result_writer.cpp:104: 107 lines including whitespace and comments (threshold 80)
Status VMysqlResultWriter<is_binary_format>::write(RuntimeState* state, Block& input_block) {
^| } | ||
|
|
||
| Status VIcebergTableWriter::write(vectorized::Block& block) { | ||
| Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' has cognitive complexity of 85 (threshold 50) [readability-function-cognitive-complexity]
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^Additional context
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:109: +1, including nesting penalty of 0, nesting level increased to 1
if (block.rows() == 0) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:113: +1, including nesting penalty of 0, nesting level increased to 1
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:113: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:120: +1, including nesting penalty of 0, nesting level increased to 1
if (_iceberg_partition_columns.empty()) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:125: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:128: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:132: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:132: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:133: +1, nesting level increased to 2
} else {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:134: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::iceberg_sink_max_file_size) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:145: +4, including nesting penalty of 3, nesting level increased to 4
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:149: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:149: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:150: +1, nesting level increased to 3
} else {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:157: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(writer->write(output_block));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:157: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->write(output_block));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:168: +1, including nesting penalty of 0, nesting level increased to 1
for (int i = 0; i < output_block.rows(); ++i) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:172: +2, including nesting penalty of 1, nesting level increased to 2
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:179: +2, including nesting penalty of 1, nesting level increased to 2
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:183: nesting level increased to 2
[&](const std::string& partition_name, int position,
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:189: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:189: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:195: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:202: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:204: +3, including nesting penalty of 2, nesting level increased to 3
if (_partitions_to_writers.size() + 1 >
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:210: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:210: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:211: +1, nesting level increased to 2
} else {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:213: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::iceberg_sink_max_file_size) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:222: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:222: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:224: +1, nesting level increased to 3
} else {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:228: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_pos_iter == writer_positions.end()) {
^be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:232: +1, nesting level increased to 3
} else {
^| } | ||
|
|
||
| Status VIcebergTableWriter::write(vectorized::Block& block) { | ||
| Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^Additional context
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp:107: 139 lines including whitespace and comments (threshold 80)
Status VIcebergTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^| } | ||
|
|
||
| Status VHiveTableWriter::write(vectorized::Block& block) { | ||
| Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' has cognitive complexity of 83 (threshold 50) [readability-function-cognitive-complexity]
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^Additional context
be/src/vec/sink/writer/vhive_table_writer.cpp:86: +1, including nesting penalty of 0, nesting level increased to 1
if (block.rows() == 0) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:90: +1, including nesting penalty of 0, nesting level increased to 1
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:90: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:98: +1, including nesting penalty of 0, nesting level increased to 1
if (_partition_columns_input_index.empty()) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:103: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:106: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:110: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:110: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:111: +1, nesting level increased to 2
} else {
^be/src/vec/sink/writer/vhive_table_writer.cpp:112: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:123: +4, including nesting penalty of 3, nesting level increased to 4
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:127: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:127: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:128: +1, nesting level increased to 3
} else {
^be/src/vec/sink/writer/vhive_table_writer.cpp:135: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(writer->write(output_block));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:135: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->write(output_block));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:141: +1, including nesting penalty of 0, nesting level increased to 1
for (int i = 0; i < output_block.rows(); ++i) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:145: +2, including nesting penalty of 1, nesting level increased to 2
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:152: nesting level increased to 2
[&](const std::string& partition_name, int position,
^be/src/vec/sink/writer/vhive_table_writer.cpp:158: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:158: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(writer->open(_state, _profile));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:164: +3, including nesting penalty of 2, nesting level increased to 3
} catch (doris::Exception& e) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:171: +2, including nesting penalty of 1, nesting level increased to 2
if (writer_iter == _partitions_to_writers.end()) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:173: +3, including nesting penalty of 2, nesting level increased to 3
if (_partitions_to_writers.size() + 1 >
^be/src/vec/sink/writer/vhive_table_writer.cpp:179: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:179: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:180: +1, nesting level increased to 2
} else {
^be/src/vec/sink/writer/vhive_table_writer.cpp:182: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:191: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^be/src/common/status.h:626: expanded from macro 'RETURN_IF_ERROR'
do { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:191: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
^be/src/common/status.h:628: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^be/src/vec/sink/writer/vhive_table_writer.cpp:193: +1, nesting level increased to 3
} else {
^be/src/vec/sink/writer/vhive_table_writer.cpp:197: +3, including nesting penalty of 2, nesting level increased to 3
if (writer_pos_iter == writer_positions.end()) {
^be/src/vec/sink/writer/vhive_table_writer.cpp:201: +1, nesting level increased to 3
} else {
^| } | ||
|
|
||
| Status VHiveTableWriter::write(vectorized::Block& block) { | ||
| Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'write' exceeds recommended size/complexity thresholds [readability-function-size]
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^Additional context
be/src/vec/sink/writer/vhive_table_writer.cpp:83: 132 lines including whitespace and comments (threshold 80)
Status VHiveTableWriter::write(RuntimeState* state, vectorized::Block& block) {
^|
run buildall |
1 similar comment
|
run buildall |
| dependency->set_ready(); | ||
| if (_is_cancelled) { | ||
| for (auto it : _result_sink_dependencys) { | ||
| it.second->set_ready(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| it.second->set_ready(); | |
| it.second->set_always_ready(); |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
HappenLee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
run buildall |
|
run buildall |
|
run performance |
HappenLee
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
PR approved by at least one committer and no changes requested. |
…ache#36305)" This reverts commit fdb5891.
## Proposed changes optimization for parallel result sink apache#36053
## Proposed changes optimization for parallel result sink #36053
Proposed changes
optimization for parallel result sink #36053