Skip to content

Conversation

@michalursa
Copy link
Contributor

@michalursa michalursa commented Feb 3, 2022

Faster implementation of hash join.

@github-actions
Copy link

github-actions bot commented Feb 3, 2022

@github-actions
Copy link

github-actions bot commented Feb 3, 2022

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@michalursa michalursa force-pushed the ARROW-14182-hash-join2 branch from a311a92 to b2e098d Compare February 3, 2022 08:33
@michalursa michalursa force-pushed the ARROW-14182-hash-join2 branch 6 times, most recently from 58faffe to c8c49e0 Compare March 1, 2022 18:20
@michalursa michalursa force-pushed the ARROW-14182-hash-join2 branch 5 times, most recently from 3a27b9c to 1b63482 Compare March 2, 2022 08:46
@nealrichardson nealrichardson marked this pull request as ready for review March 3, 2022 18:59
@nealrichardson
Copy link
Member

@save-buffer PTAL

@michalursa michalursa force-pushed the ARROW-14182-hash-join2 branch from 1b63482 to be2a55d Compare March 7, 2022 07:40
@michalursa michalursa force-pushed the ARROW-14182-hash-join2 branch 2 times, most recently from 21849e2 to 419a14b Compare March 16, 2022 06:49
// a) 64-bit string offsets
// b) residual predicates
// c) dictionaries
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much work would it be to implement this for big endian?
Are there any plans for big strings and dictionaries? I know residual predicates are planned to come up shortly

num_threads_ = num_threads;
schema_mgr_ = schema_mgr;
schema_[0] = proj_map_left;
schema_[1] = proj_map_right;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we get rid of schema manager?

class HashJoinImpl {
public:
using OutputBatchCallback = std::function<void(ExecBatch)>;
using OutputBatchCallback = std::function<void(int64_t, ExecBatch)>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this new int64 parameter for? Doesn't it just get ignored later?

const KeyEncoder::KeyRowArray& rows, uint8_t* match_bytevector);

static void CompareVarBinaryColumnToRow_avx2(
static uint32_t CompareVarBinaryColumnToRow_avx2(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment about what this returns?

// https://github.com/Cyan4973/xxHash/blob/dev/doc/xxhash_spec.md
//
class Hashing {
class Hashing32 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall good, I'll take it on faith that it works, but function names should be switched to PascalCase.


void SwissTableWithKeys::MapReadOnly(Input* input, const uint32_t* hashes,
uint8_t* match_bitvector, uint32_t* key_ids) {
std::ignore = Map(input, /*insert_missing=*/false, hashes, match_bitvector, key_ids);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we DCHECK_OK this? Why is it valid to ignore return status here?

source_payload_ids.data());

// TODO: Uncomment for debugging
// prtn_state.payloads.DebugPrintToFile("payload_local.txt", false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be removed I think

}
}

void SwissTableForJoinBuild::FinishPrtnMerge(util::TempVectorStack* temp_stack) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could return a Status, and then you don't have to ignore has_any_nulls

return output.array_data();
}

Status JoinResultMaterialize::Flush(ExecBatch* out) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not Result<ExecBatch>?

TaskScheduler::ScheduleImpl schedule_task_callback) override {
num_threads_ = static_cast<int>(std::max(num_threads, static_cast<size_t>(1)));

START_SPAN(span_, "HashJoinBasicImpl",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably switch span to SwissJoin

@westonpace
Copy link
Member

Can we rebase this now that the bloom filter (and key_hash) changes have been merged in.

@michalursa michalursa force-pushed the ARROW-14182-hash-join2 branch from 419a14b to c576028 Compare April 4, 2022 21:28
Comment on lines 92 to 94
Status AppendNulls(MemoryPool* pool,
const std::vector<std::shared_ptr<DataType>>& types,
int num_rows_to_append, int* num_appended);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this overload is no longer used anywhere

@westonpace
Copy link
Member

@michalursa I think this can be closed in favor of #13493

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants