-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-14182: [C++][Compute] Hash Join performance improvement #12326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
a311a92 to
b2e098d
Compare
58faffe to
c8c49e0
Compare
3a27b9c to
1b63482
Compare
|
@save-buffer PTAL |
1b63482 to
be2a55d
Compare
21849e2 to
419a14b
Compare
| // a) 64-bit string offsets | ||
| // b) residual predicates | ||
| // c) dictionaries | ||
| // |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How much work would it be to implement this for big endian?
Are there any plans for big strings and dictionaries? I know residual predicates are planned to come up shortly
| num_threads_ = num_threads; | ||
| schema_mgr_ = schema_mgr; | ||
| schema_[0] = proj_map_left; | ||
| schema_[1] = proj_map_right; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we get rid of schema manager?
| class HashJoinImpl { | ||
| public: | ||
| using OutputBatchCallback = std::function<void(ExecBatch)>; | ||
| using OutputBatchCallback = std::function<void(int64_t, ExecBatch)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this new int64 parameter for? Doesn't it just get ignored later?
| const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector); | ||
|
|
||
| static void CompareVarBinaryColumnToRow_avx2( | ||
| static uint32_t CompareVarBinaryColumnToRow_avx2( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment about what this returns?
| // https://github.com/Cyan4973/xxHash/blob/dev/doc/xxhash_spec.md | ||
| // | ||
| class Hashing { | ||
| class Hashing32 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall good, I'll take it on faith that it works, but function names should be switched to PascalCase.
|
|
||
| void SwissTableWithKeys::MapReadOnly(Input* input, const uint32_t* hashes, | ||
| uint8_t* match_bitvector, uint32_t* key_ids) { | ||
| std::ignore = Map(input, /*insert_missing=*/false, hashes, match_bitvector, key_ids); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we DCHECK_OK this? Why is it valid to ignore return status here?
| source_payload_ids.data()); | ||
|
|
||
| // TODO: Uncomment for debugging | ||
| // prtn_state.payloads.DebugPrintToFile("payload_local.txt", false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed I think
| } | ||
| } | ||
|
|
||
| void SwissTableForJoinBuild::FinishPrtnMerge(util::TempVectorStack* temp_stack) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could return a Status, and then you don't have to ignore has_any_nulls
| return output.array_data(); | ||
| } | ||
|
|
||
| Status JoinResultMaterialize::Flush(ExecBatch* out) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not Result<ExecBatch>?
| TaskScheduler::ScheduleImpl schedule_task_callback) override { | ||
| num_threads_ = static_cast<int>(std::max(num_threads, static_cast<size_t>(1))); | ||
|
|
||
| START_SPAN(span_, "HashJoinBasicImpl", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably switch span to SwissJoin
|
Can we rebase this now that the bloom filter (and key_hash) changes have been merged in. |
419a14b to
c576028
Compare
| Status AppendNulls(MemoryPool* pool, | ||
| const std::vector<std::shared_ptr<DataType>>& types, | ||
| int num_rows_to_append, int* num_appended); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this overload is no longer used anywhere
c576028 to
4971959
Compare
4971959 to
4116d32
Compare
|
@michalursa I think this can be closed in favor of #13493 |
Faster implementation of hash join.