HashJoin respects max_joined_block_size_rows#56996
Conversation
|
This is an automated comment for commit 9b13705 with description of existing statuses. It's updated for the latest CI running ❌ Click here to open a full report in a separate page Successful checks
|
|
The code only controls the size of the columns in the left table. In the use case I designed, there will be more columns in the right table. These columns will be generated at once during the join, using a lot of memory. When the right table has more columns (>40), the performance here also has a big problem. This may be the reason why it is slower than my version. The expansion of the right table columns can also cause OOM. What is more difficult to handle is that a row of data in the left table may be associated with tens of thousands of records, because the data may be unbalanced. It is necessary to record the processing status when matching the hash map, or do lazy materialization |
|
With current approach we will insert number of rows equals to the number of matches. But it should not be a problem unless it is smaller than block size. Imagine we have block size |
2f8fe35 to
7d07196
Compare
Sorry, I misunderstood. The memory issue can basically be solved. An extreme case. There is a row that matches a lot of records(> max_joined_block_rows ) and needs to output a lot of columns. This situation may still not be solved. In other words, max_joined_block_rows is not the maximum output size in the strict sense and can be exceeded in theory. In my code is to be able to strictly guarantee the maximum output size |
|
|
||
| bool has_required_right_keys = (required_right_keys.columns() != 0); | ||
| added_columns.need_filter = join_features.need_filter || has_required_right_keys; | ||
| added_columns.max_joined_block_rows = table_join->maxJoinedBlockRows(); |
There was a problem hiding this comment.
Not sure if AddedColumns::columns has pre-allocated memory
@vdimir neng's case doesn‘t include such situation, but his implmentation already solved this issue, i.e., As for performance, we guess it would be related with whether memory is pre-allocated or not. see neng's comment on added_columns.max_joined_block_rows = table_join->maxJoinedBlockRows();But, it is just a guess, probably perf can find something diffierent. |
190a72d to
5779753
Compare
|
@vdimir Any update on this PR? Thanks. |
5779753 to
8d9fa3c
Compare
|
Requested review from random people from the team, maybe someone will take a look. |
How's the performance compared to #54662? |
Unfortunately a bit worse than in that PR, but it has improvements comparing to |
| if (unlikely(current_offset > max_joined_block_rows)) | ||
| { | ||
| added_columns.offsets_to_replicate->resize_assume_reserved(i); | ||
| added_columns.filter.resize_assume_reserved(i); |
There was a problem hiding this comment.
Shouldn't we also check if constexpr (need_filter) ?
There was a problem hiding this comment.
We need to ensure that it's reserved
|
@vdimir I found the reason for the slowness. After this PR is merged, I will submit a new PR to optimize performance. |
Sounds good! (@devcrafter) |
@vdimir Looking into it, but please fix the build |
9e10ee7 to
51eec7a
Compare
|
CI failures hardly related, but still pretty interesting:
MemorySanitizer: use-of-uninitialized-value
ThreadSanitizer: data race |
Co-authored-by: liuneng <1398775315@qq.com>
51eec7a to
9b13705
Compare
|
Rebased one more time |
| if (!data) | ||
| { | ||
| LOG_TRACE(log, "({}) Join data has been already released", fmt::ptr(this)); | ||
| LOG_TRACE(log, "{}Join data has been already released", instance_log_id); |
There was a problem hiding this comment.
It'd be more convenient to make instance_log_id part of logger name
There was a problem hiding this comment.
Logger name normally corresponds to component name, but not to particular component instance, imo. So, not sure if we should make logger name dynamic
| auto inner_hash_join = std::make_shared<InternalHashJoin>(); | ||
| inner_hash_join->data = std::make_unique<HashJoin>(table_join_, right_sample_block, any_take_last_row_); | ||
|
|
||
| inner_hash_join->data = std::make_unique<HashJoin>(table_join_, right_sample_block, any_take_last_row_, 0, fmt::format("concurrent{}", i)); |
| left_sample_block = sample_block.cloneEmpty(); | ||
| output_sample_block = left_sample_block.cloneEmpty(); | ||
| ExtraBlockPtr not_processed; | ||
| ExtraBlockPtr not_processed = nullptr; |
There was a problem hiding this comment.
It's not necessary to initialize with nullptr
| size_t i = 0; | ||
| for (; i < rows; ++i) | ||
| { | ||
| if constexpr (join_features.need_replication) |
There was a problem hiding this comment.
@vdimir you've explained what need_replicaion means at the time, but I've already forgotten and there is no comment. Let's add it next to the need_replication definition.
There was a problem hiding this comment.
Added comments and changed resize_assume_reserved -> resize #58289
| { | ||
| if (unlikely(current_offset > max_joined_block_rows)) | ||
| { | ||
| added_columns.offsets_to_replicate->resize_assume_reserved(i); |
There was a problem hiding this comment.
How do we ensure that offsets_to_replicate is not nullptr and the size is reserved?
There was a problem hiding this comment.
Yes, we initialize it if join_features.need_replication = true
ClickHouse/src/Interpreters/HashJoin.cpp
Lines 1394 to 1395 in d764d8d
And use it only when flag is enabled
| if (unlikely(current_offset > max_joined_block_rows)) | ||
| { | ||
| added_columns.offsets_to_replicate->resize_assume_reserved(i); | ||
| added_columns.filter.resize_assume_reserved(i); |
There was a problem hiding this comment.
We need to ensure that it's reserved
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
max_joined_block_size_rowsand do not produce large blocks forALL JOINRef #54662