-
Notifications
You must be signed in to change notification settings - Fork 4k
Closed
Description
Describe the bug, including details regarding any error messages, version, and platform.
After fixing #44513 , I was further exploring how far we can go in terms of overflow-safe. I enlarged the scale of the case in #44513 as followed:
Details
TEST(HashJoin, LARGE_MEMORY_TEST(BuildSidePayloadOver4GB)) {
const int64_t num_match_rows = 32;
const int64_t num_rows_per_match_batch = 32;
const int64_t num_match_batches = num_match_rows / num_rows_per_match_batch;
const int64_t num_unmatch_rows_large = 720898048;
const int64_t num_rows_per_unmatch_batch_large = 352001;
const int64_t num_unmatch_batches_large =
num_unmatch_rows_large / num_rows_per_unmatch_batch_large;
auto schema_small = schema({field("small_key0", int64()), field("small_key1", int64()),
field("small_key2", int64())});
auto schema_large =
schema({field("large_key0", int64()), field("large_key1", int64()),
field("large_key2", int64()), field("large_payload", int64())});
const int64_t match_key0 = static_cast<int64_t>(88506230299);
const int64_t match_key1 = static_cast<int64_t>(16556030299);
const int64_t match_key2 = 11240299;
const int64_t match_payload = 42;
// Match arrays of length num_rows_per_match_batch.
ASSERT_OK_AND_ASSIGN(
auto match_key0_arr,
Constant(MakeScalar(match_key0))->Generate(num_rows_per_match_batch));
ASSERT_OK_AND_ASSIGN(
auto match_key1_arr,
Constant(MakeScalar(match_key1))->Generate(num_rows_per_match_batch));
ASSERT_OK_AND_ASSIGN(
auto match_key2_arr,
Constant(MakeScalar(match_key2))->Generate(num_rows_per_match_batch));
ASSERT_OK_AND_ASSIGN(
auto match_payload_arr,
Constant(MakeScalar(match_payload))->Generate(num_rows_per_match_batch));
// Small batch.
ExecBatch batch_small({match_key0_arr, match_key1_arr, match_key2_arr},
num_rows_per_match_batch);
// Large unmatch batch.
const int64_t seed = 42;
auto unmatch_key_arr_large = RandomArrayGenerator(seed).Int64(
num_rows_per_unmatch_batch_large, /*min=*/match_key2 + 1,
/*max=*/match_key2 + 1 + 8);
ASSERT_OK_AND_ASSIGN(auto unmatch_payload_arr_large,
MakeArrayOfNull(int64(), num_rows_per_unmatch_batch_large));
ExecBatch unmatch_batch_large({unmatch_key_arr_large, unmatch_key_arr_large,
unmatch_key_arr_large, unmatch_payload_arr_large},
num_rows_per_unmatch_batch_large);
// Large match batch.
ExecBatch match_batch_large(
{match_key0_arr, match_key1_arr, match_key2_arr, match_payload_arr},
num_rows_per_match_batch);
// Batches with schemas.
auto batches_small = BatchesWithSchema{
std::vector<ExecBatch>(num_match_batches, batch_small), schema_small};
auto batches_large = BatchesWithSchema{
std::vector<ExecBatch>(num_unmatch_batches_large, unmatch_batch_large),
schema_large};
for (int i = 0; i < num_match_batches; i++) {
batches_large.batches.push_back(match_batch_large);
}
Declaration source_small{
"exec_batch_source",
ExecBatchSourceNodeOptions(batches_small.schema, batches_small.batches)};
Declaration source_large{
"exec_batch_source",
ExecBatchSourceNodeOptions(batches_large.schema, batches_large.batches)};
HashJoinNodeOptions join_opts(
JoinType::INNER,
/*left_keys=*/{"small_key0", "small_key1", "small_key2"},
/*right_keys=*/{"large_key0", "large_key1", "large_key2"});
Declaration join{
"hashjoin", {std::move(source_small), std::move(source_large)}, join_opts};
// Join should emit num_match_rows * num_match_rows rows.
ASSERT_OK_AND_ASSIGN(auto batches_result, DeclarationToExecBatches(std::move(join)));
Declaration result{"exec_batch_source",
ExecBatchSourceNodeOptions(std::move(batches_result.schema),
std::move(batches_result.batches))};
AssertRowCountEq(result, num_match_rows * num_match_rows);
// The payload should all be match_payload.
auto predicate = equal(field_ref("large_payload"), literal(match_payload));
Declaration filter{"filter", {result}, FilterNodeOptions{std::move(predicate)}};
AssertRowCountEq(std::move(filter), num_match_rows * num_match_rows);
}
And three more overflow issues are identified:
const uint8_t* row_ptr = row_ptr_base + row_length * row_id; int64_t bit_id = row_id * null_mask_num_bytes * 8 + pos_after_encoding; arrow/cpp/src/arrow/acero/swiss_join_avx2.cc
Line 247 in 12f6265
__m256i bit_id = _mm256_mullo_epi32(row_id, null_bits_per_row);
All are using 32-bit multiplication and if the row_id is big enough (but still valid) e.g., >= 0x20000000 and the row_length or null_bytes_per_row isn't too small e.g., >=8, overflow happens.
I appreciate that this test, as well as the original one in #44513 , is powerful enough to reveal all these three issues.
Component(s)
C++