Skip to content

Conversation

@mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Apr 12, 2023

Which issue does this PR close?

Closes #5892.

Rationale for this change

Improve the Aggregate performance for decimal type

What changes are included in this PR?

Are these changes tested?

I had test this on my local Mac.
For TPCH-q17, there is at about 25% improvement.

Before this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 17 iteration 0 took 2623.2 ms and returned 1 rows
Query 17 iteration 1 took 2580.4 ms and returned 1 rows
Query 17 iteration 2 took 2575.7 ms and returned 1 rows
Query 17 avg time: 2593.11 ms

After this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(17), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 17 iteration 0 took 1920.4 ms and returned 1 rows
Query 17 iteration 1 took 1877.0 ms and returned 1 rows
Query 17 iteration 2 took 1878.1 ms and returned 1 rows
Query 17 avg time: 1891.81 ms

For TPCH-q18, it seems there is little improvement.

Before this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(18), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 18 iteration 0 took 1359.0 ms and returned 57 rows
Query 18 iteration 1 took 1285.2 ms and returned 57 rows
Query 18 iteration 2 took 1278.0 ms and returned 57 rows
Query 18 avg time: 1307.40 ms

After this PR:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: Some(18), debug: false, iterations: 3, partitions: 1, batch_size: 8192, path: "./parquet_data", file_format: "parquet", mem_table: false, output_path: None, disable_statistics: false, enable_scheduler: false }
Query 18 iteration 0 took 1335.8 ms and returned 57 rows
Query 18 iteration 1 took 1257.7 ms and returned 57 rows
Query 18 iteration 2 took 1237.9 ms and returned 57 rows
Query 18 avg time: 1277.12 ms

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate physical-expr Changes to the physical-expr crates sqllogictest SQL Logic Tests (.slt) labels Apr 12, 2023
@mingmwang mingmwang requested review from alamb and yjshen April 12, 2023 09:39
@mingmwang
Copy link
Contributor Author

@alamb alamb changed the title Row accumulator support Decimal type Row AVG accumulator support Decimal type Apr 12, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me -- @yjshen could you please review the changes to the row format?

DataType::Decimal128(p, s) => {
match accessor.get_u64_opt(self.state_index()) {
None => Ok(ScalarValue::Decimal128(None, p, s)),
Some(0) => Ok(ScalarValue::Decimal128(None, p, s)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we translate 0 --> null here? (it is also done for Float64 below)?

I see you are just following the existing pattern, but it seems like this could be incorrect?

Maybe we could add a test that calls AVG on (-1 and 1) to see if we get 0 or NULL

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do some test on this today.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alamb, the value at the state_index is for the count rather than the sum. When the count is 0, for the average, it should be NULL.

@mingmwang
Copy link
Contributor Author

For query 18, I think the plan is problematic, it is the Join order and build side selection, the bottleneck is not the Aggregations.

=== Physical plan with metrics ===
SortExec: expr=[o_totalprice@4 DESC,o_orderdate@3 ASC NULLS LAST], metrics=[output_rows=57, elapsed_compute=11.461µs, spill_count=0, spilled_bytes=0]
  AggregateExec: mode=Single, gby=[c_name@1 as c_name, c_custkey@0 as c_custkey, o_orderkey@2 as o_orderkey, o_orderdate@4 as o_orderdate, o_totalprice@3 as o_totalprice], aggr=[SUM(lineitem.l_quantity)], metrics=[output_rows=57, elapsed_compute=49.165µs, spill_count=0, spilled_bytes=0, mem_used=0]
    CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=399, elapsed_compute=2.681µs, spill_count=0, spilled_bytes=0, mem_used=0]
      HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })], metrics=[output_rows=456, input_rows=456, input_batches=2, build_input_batches=733, build_input_rows=6001215, output_batches=2, build_mem_used=806510152, build_time=676.460128ms, join_time=2.88546ms]
        ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, o_orderkey@2 as o_orderkey, o_totalprice@3 as o_totalprice, o_orderdate@4 as o_orderdate, l_quantity@6 as l_quantity], metrics=[output_rows=6001215, elapsed_compute=116.294µs, spill_count=0, spilled_bytes=0, mem_used=0]
          CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=6001215, elapsed_compute=50.883µs, spill_count=0, spilled_bytes=0, mem_used=0]
            HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: "o_orderkey", index: 2 }, Column { name: "l_orderkey", index: 0 })], metrics=[output_rows=6001215, input_rows=6001215, input_batches=733, build_input_batches=184, build_input_rows=1500000, output_batches=733, build_mem_used=177373616, build_time=154.423989ms, join_time=268.907576ms]
              ProjectionExec: expr=[c_custkey@0 as c_custkey, c_name@1 as c_name, o_orderkey@2 as o_orderkey, o_totalprice@4 as o_totalprice, o_orderdate@5 as o_orderdate], metrics=[output_rows=1500000, elapsed_compute=34.671µs, spill_count=0, spilled_bytes=0, mem_used=0]
                CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1500000, elapsed_compute=14.97µs, spill_count=0, spilled_bytes=0, mem_used=0]
                  HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(Column { name: "c_custkey", index: 0 }, Column { name: "o_custkey", index: 1 })], metrics=[output_rows=1500000, input_rows=1500000, input_batches=184, build_input_batches=19, build_input_rows=150000, output_batches=184, build_mem_used=14652720, build_time=7.173373ms, join_time=64.614167ms]
                    ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data/customer/part-0.parquet]]}, projection=[c_custkey, c_name], metrics=[output_rows=150000, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, pushdown_rows_filtered=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, row_groups_pruned=0, bytes_scanned=566600, page_index_rows_filtered=0, pushdown_eval_time=2ns, time_elapsed_scanning_total=5.384791ms, time_elapsed_processing=5.024293ms, time_elapsed_scanning_until_data=2.83875ms, time_elapsed_opening=787.167µs, page_index_eval_time=2ns]
                    ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data/orders/part-0.parquet]]}, projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate], metrics=[output_rows=1500000, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, pushdown_rows_filtered=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, row_groups_pruned=0, bytes_scanned=13916402, page_index_rows_filtered=0, pushdown_eval_time=2ns, time_elapsed_scanning_total=112.261704ms, time_elapsed_processing=45.363041ms, time_elapsed_scanning_until_data=6.063625ms, time_elapsed_opening=549.5µs, page_index_eval_time=2ns]
              ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data/lineitem/part-0.parquet]]}, projection=[l_orderkey, l_quantity], metrics=[output_rows=6001215, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, pushdown_rows_filtered=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, row_groups_pruned=0, bytes_scanned=12170874, page_index_rows_filtered=0, pushdown_eval_time=2ns, time_elapsed_scanning_total=324.056333ms, time_elapsed_processing=50.1033ms, time_elapsed_scanning_until_data=2.464666ms, time_elapsed_opening=2.0565ms, page_index_eval_time=2ns]
        ProjectionExec: expr=[l_orderkey@0 as l_orderkey], metrics=[output_rows=57, elapsed_compute=458ns, spill_count=0, spilled_bytes=0, mem_used=0]
          CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=57, elapsed_compute=19.342µs, spill_count=0, spilled_bytes=0, mem_used=0]
            FilterExec: SUM(lineitem.l_quantity)@1 > Some(30000),25,2, metrics=[output_rows=57, elapsed_compute=1.093341ms, spill_count=0, spilled_bytes=0, mem_used=0]
              AggregateExec: mode=Single, gby=[l_orderkey@0 as l_orderkey], aggr=[SUM(lineitem.l_quantity)], metrics=[output_rows=1500000, elapsed_compute=434.341183ms, spill_count=0, spilled_bytes=0, mem_used=0]
                ParquetExec: limit=None, partitions={1 group: [[Users/mingmwang/gitrepo/apache/arrow-datafusion/benchmarks/parquet_data/lineitem/part-0.parquet]]}, projection=[l_orderkey, l_quantity], metrics=[output_rows=6001215, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, pushdown_rows_filtered=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, row_groups_pruned=0, bytes_scanned=12170874, page_index_rows_filtered=0, pushdown_eval_time=2ns, time_elapsed_scanning_total=443.876134ms, time_elapsed_processing=49.172503ms, time_elapsed_scanning_until_data=2.226959ms, time_elapsed_opening=844.959µs, page_index_eval_time=2ns]

Copy link
Member

@yjshen yjshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mingmwang @yahoNanJing! Looks great to me.

Could we also add two roundtrip (DecimalArray -> vec<u8> -> DecimalArray) tests in datafusion/row/src/lib.rs for the newly introduced Decimal type? One for a null-free case and one for a nullable case.

@mingmwang
Copy link
Contributor Author

Thanks @mingmwang @yahoNanJing! Looks great to me.

Could we also add two roundtrip (DecimalArray -> vec<u8> -> DecimalArray) tests in datafusion/row/src/lib.rs for the newly introduced Decimal type? One for a null-free case and one for a nullable case.

Sure, I will add the two tests in a following PR.

@mingmwang mingmwang merged commit fcd8b89 into apache:main Apr 13, 2023
@alamb
Copy link
Contributor

alamb commented Apr 13, 2023

🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-expr Changes to the physical-expr crates sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Make RowAccumulator support Decimal

4 participants