Skip to content

[CH] Rewrite union of multiple aggregates #8675

@lgbo-ustc

Description

@lgbo-ustc

Description

Following query needs to scan the table nation multiple times.

select n_regionkey , n_name, '0' as t, avg(n_nationkey) + count(n_name) as s from  nation where n_nationkey % 3 = 0 group by n_name,  n_regionkey 
union all
select n_regionkey , n_name, '1' as t, avg(n_nationkey) + count(n_name) as s from nation where n_nationkey % 2 = 1 group by n_name, n_regionkey 

Following is the physical plan

AdaptiveSparkPlan isFinalPlan=false
+- Union
   :- HashAggregate(keys=[n_name#5, _groupingexpression#24L], functions=[avg(cast(n_nationkey#4L as double)), count(n_name#5)], output=[(n_regionkey + 1)#18L, n_name#5, t#0, s#1])
   :  +- Exchange hashpartitioning(n_name#5, _groupingexpression#24L, 5), ENSURE_REQUIREMENTS, [plan_id=38]
   :     +- HashAggregate(keys=[n_name#5, _groupingexpression#24L], functions=[partial_avg(cast(n_nationkey#4L as double)), partial_count(n_name#5)], output=[n_name#5, _groupingexpression#24L, sum#29, count#30L, count#31L])
   :        +- Project [n_nationkey#4L, n_name#5, (n_regionkey#6L + cast(1 as bigint)) AS _groupingexpression#24L]
   :           +- Filter (isnotnull(n_nationkey#4L) AND ((n_nationkey#4L % cast(2 as bigint)) = cast(0 as bigint)))
   :              +- FileScan parquet tpch_pq.nation[n_nationkey#4L,n_name#5,n_regionkey#6L]
   +- HashAggregate(keys=[n_name#9, _groupingexpression#25L], functions=[avg(cast(n_nationkey#8L as double)), count(n_name#9)], output=[(n_regionkey + 1)#19L, n_name#9, t#2, s#3])
      +- Exchange hashpartitioning(n_name#9, _groupingexpression#25L, 5), ENSURE_REQUIREMENTS, [plan_id=40]
         +- HashAggregate(keys=[n_name#9, _groupingexpression#25L], functions=[partial_avg(cast(n_nationkey#8L as double)), partial_count(n_name#9)], output=[n_name#9, _groupingexpression#25L, sum#35, count#36L, count#37L])
            +- Project [n_nationkey#8L, n_name#9, (n_regionkey#10L + cast(1 as bigint)) AS _groupingexpression#25L]
               +- Filter (isnotnull(n_nationkey#8L) AND ((n_nationkey#8L % cast(2 as bigint)) = cast(1 as bigint)))
                  +- FileScan parquet tpch_pq.nation[n_nationkey#8L,n_name#9,n_regionkey#10L]

When the table is too large, the scan takes up most of the execution time.

We could rewrite the query as following, to make nation be scanned only once.

select n_regionkey, n_name, t, avg(n_nationkey) + count(n_name) from (
  select st.n_regionkey as n_regionkey, st.n_name as n_name, st.t as t, st.n_nationkey as n_nationkey, st.gid as gid from (
    select st from (
        select explode(a) as st from (
          select array(
            if( n_nationkey % 3 = 0, named_struct('n_regionkey', n_regionkey, 'n_nationkey', n_nationkey, 'n_name', n_name, 't', '0', 'gid', 0), null), 
            if(n_nationkey % 2, named_struct('n_regionkey', n_regionkey, 'n_nationkey', n_nationkey, 'n_name', n_name, 't', '1', 'gid', 1), null)) as a from (
            select * from nation where n_nationkey % 3 = 0 or n_nationkey % 2
          )
        )
    ) where st is not null
  )
) group by n_regionkey, n_nationkey, t, gid

The second query is more difficult to write then the first one.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions