Skip to content

[C++][Acero] Overflow issues in swiss join when the number of build side rows is big enough #45334

@zanmato1984

Description

@zanmato1984

Describe the bug, including details regarding any error messages, version, and platform.

After fixing #44513 , I was further exploring how far we can go in terms of overflow-safe. I enlarged the scale of the case in #44513 as followed:

Details
TEST(HashJoin, LARGE_MEMORY_TEST(BuildSidePayloadOver4GB)) {
  const int64_t num_match_rows = 32;
  const int64_t num_rows_per_match_batch = 32;
  const int64_t num_match_batches = num_match_rows / num_rows_per_match_batch;

  const int64_t num_unmatch_rows_large = 720898048;
  const int64_t num_rows_per_unmatch_batch_large = 352001;
  const int64_t num_unmatch_batches_large =
      num_unmatch_rows_large / num_rows_per_unmatch_batch_large;

  auto schema_small = schema({field("small_key0", int64()), field("small_key1", int64()),
                              field("small_key2", int64())});
  auto schema_large =
      schema({field("large_key0", int64()), field("large_key1", int64()),
              field("large_key2", int64()), field("large_payload", int64())});

  const int64_t match_key0 = static_cast<int64_t>(88506230299);
  const int64_t match_key1 = static_cast<int64_t>(16556030299);
  const int64_t match_key2 = 11240299;
  const int64_t match_payload = 42;

  // Match arrays of length num_rows_per_match_batch.
  ASSERT_OK_AND_ASSIGN(
      auto match_key0_arr,
      Constant(MakeScalar(match_key0))->Generate(num_rows_per_match_batch));
  ASSERT_OK_AND_ASSIGN(
      auto match_key1_arr,
      Constant(MakeScalar(match_key1))->Generate(num_rows_per_match_batch));
  ASSERT_OK_AND_ASSIGN(
      auto match_key2_arr,
      Constant(MakeScalar(match_key2))->Generate(num_rows_per_match_batch));
  ASSERT_OK_AND_ASSIGN(
      auto match_payload_arr,
      Constant(MakeScalar(match_payload))->Generate(num_rows_per_match_batch));

  // Small batch.
  ExecBatch batch_small({match_key0_arr, match_key1_arr, match_key2_arr},
                        num_rows_per_match_batch);

  // Large unmatch batch.
  const int64_t seed = 42;
  auto unmatch_key_arr_large = RandomArrayGenerator(seed).Int64(
      num_rows_per_unmatch_batch_large, /*min=*/match_key2 + 1,
      /*max=*/match_key2 + 1 + 8);
  ASSERT_OK_AND_ASSIGN(auto unmatch_payload_arr_large,
                       MakeArrayOfNull(int64(), num_rows_per_unmatch_batch_large));
  ExecBatch unmatch_batch_large({unmatch_key_arr_large, unmatch_key_arr_large,
                                 unmatch_key_arr_large, unmatch_payload_arr_large},
                                num_rows_per_unmatch_batch_large);
  // Large match batch.
  ExecBatch match_batch_large(
      {match_key0_arr, match_key1_arr, match_key2_arr, match_payload_arr},
      num_rows_per_match_batch);

  // Batches with schemas.
  auto batches_small = BatchesWithSchema{
      std::vector<ExecBatch>(num_match_batches, batch_small), schema_small};
  auto batches_large = BatchesWithSchema{
      std::vector<ExecBatch>(num_unmatch_batches_large, unmatch_batch_large),
      schema_large};
  for (int i = 0; i < num_match_batches; i++) {
    batches_large.batches.push_back(match_batch_large);
  }

  Declaration source_small{
      "exec_batch_source",
      ExecBatchSourceNodeOptions(batches_small.schema, batches_small.batches)};
  Declaration source_large{
      "exec_batch_source",
      ExecBatchSourceNodeOptions(batches_large.schema, batches_large.batches)};

  HashJoinNodeOptions join_opts(
      JoinType::INNER,
      /*left_keys=*/{"small_key0", "small_key1", "small_key2"},
      /*right_keys=*/{"large_key0", "large_key1", "large_key2"});
  Declaration join{
      "hashjoin", {std::move(source_small), std::move(source_large)}, join_opts};

  // Join should emit num_match_rows * num_match_rows rows.
  ASSERT_OK_AND_ASSIGN(auto batches_result, DeclarationToExecBatches(std::move(join)));
  Declaration result{"exec_batch_source",
                     ExecBatchSourceNodeOptions(std::move(batches_result.schema),
                                                std::move(batches_result.batches))};
  AssertRowCountEq(result, num_match_rows * num_match_rows);

  // The payload should all be match_payload.
  auto predicate = equal(field_ref("large_payload"), literal(match_payload));
  Declaration filter{"filter", {result}, FilterNodeOptions{std::move(predicate)}};
  AssertRowCountEq(std::move(filter), num_match_rows * num_match_rows);
}

And three more overflow issues are identified:

  1. const uint8_t* row_ptr = row_ptr_base + row_length * row_id;
  2. int64_t bit_id = row_id * null_mask_num_bytes * 8 + pos_after_encoding;
  3. __m256i bit_id = _mm256_mullo_epi32(row_id, null_bits_per_row);

All are using 32-bit multiplication and if the row_id is big enough (but still valid) e.g., >= 0x20000000 and the row_length or null_bytes_per_row isn't too small e.g., >=8, overflow happens.

I appreciate that this test, as well as the original one in #44513 , is powerful enough to reveal all these three issues.

Component(s)

C++

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions