-
Notifications
You must be signed in to change notification settings - Fork 589
[CH] Support WindowGroupLimit for row_number, rank and dense_rank #7087
Copy link
Copy link
Closed
Labels
enhancementNew feature or requestNew feature or request
Description
Description
Part of #6067.
For ds q44, the phyiscal plan is
CHNativeColumnarToRow
+- TakeOrderedAndProjectExecTransformer (limit=100, orderBy=[rnk#74 ASC NULLS FIRST], output=[rnk#74,best_performing#80,worst_performing#81])
+- ^(16) ProjectExecTransformer [rnk#74, i_product_name#135 AS best_performing#80, i_product_name#180 AS worst_performing#81]
+- ^(16) CHSortMergeJoinExecTransformer [item_sk#75L], [i_item_sk#159L], Inner, false
:- ^(16) SortExecTransformer [item_sk#75L ASC NULLS FIRST], false, 0
: +- ^(16) InputIteratorTransformer[rnk#74, item_sk#75L, i_product_name#135]
: +- ColumnarExchange hashpartitioning(item_sk#75L, 5), ENSURE_REQUIREMENTS, [plan_id=1465], [shuffle_writer_type=hash], [OUTPUT] List(rnk:IntegerType, item_sk:LongType, i_product_name:StringType)
: +- ^(14) ProjectExecTransformer [rnk#74, item_sk#75L, i_product_name#135]
: +- ^(14) CHSortMergeJoinExecTransformer [item_sk#70L], [i_item_sk#114L], Inner, false
: :- ^(14) SortExecTransformer [item_sk#70L ASC NULLS FIRST], false, 0
: : +- ^(14) InputIteratorTransformer[item_sk#70L, rnk#74, item_sk#75L]
: : +- ColumnarExchange hashpartitioning(item_sk#70L, 5), ENSURE_REQUIREMENTS, [plan_id=1407], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rnk:IntegerType, item_sk:LongType)
: : +- ^(12) ProjectExecTransformer [item_sk#70L, rnk#74, item_sk#75L]
: : +- ^(12) CHSortMergeJoinExecTransformer [rnk#74], [rnk#79], Inner, false
: : :- ^(12) SortExecTransformer [rnk#74 ASC NULLS FIRST], false, 0
: : : +- ^(12) ProjectExecTransformer [item_sk#70L, rnk#74]
: : : +- ^(12) FilterExecTransformer ((rnk#74 < 11) AND isnotnull(item_sk#70L))
: : : +- ^(12) WindowExecTransformer [rank(rank_col#71) windowspecdefinition(rank_col#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#74], [rank_col#71 ASC NULLS FIRST]
: : : +- ^(12) InputIteratorTransformer[item_sk#70L, rank_col#71]
: : : +- RowToCHNativeColumnar
: : : +- WindowGroupLimit [rank_col#71 ASC NULLS FIRST], rank(rank_col#71), 10, Final
: : : +- CHNativeColumnarToRow
: : : +- ^(8) SortExecTransformer [rank_col#71 ASC NULLS FIRST], false, 0
: : : +- ^(8) InputIteratorTransformer[item_sk#70L, rank_col#71]
: : : +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=1210], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
: : : +- RowToCHNativeColumnar
: : : +- WindowGroupLimit [rank_col#71 ASC NULLS FIRST], rank(rank_col#71), 10, Partial
: : : +- CHNativeColumnarToRow
: : : +- ^(7) SortExecTransformer [rank_col#71 ASC NULLS FIRST], false, 0
: : : +- ^(7) FilterExecTransformer (isnotnull(rank_col#71) AND (cast(rank_col#71 as decimal(13,7)) > (0.9 * Subquery scalar-subquery#73, [id=#294])))
: : : : +- Subquery scalar-subquery#73, [id=#294]
: : : : +- CHNativeColumnarToRow
: : : : +- ^(3) ProjectExecTransformer [cast((avg(UnscaledValue(ss_net_profit#209))#185 / 100.0) as decimal(11,6)) AS rank_col#72]
: : : : +- ^(3) HashAggregateTransformer(keys=[ss_store_sk#195L], functions=[avg(UnscaledValue(ss_net_profit#209))], isStreamingAgg=false)
: : : : +- ^(3) InputIteratorTransformer[ss_store_sk#195L, sum#265, count#266L]
: : : : +- ColumnarExchange hashpartitioning(ss_store_sk#195L, 5), ENSURE_REQUIREMENTS, [plan_id=287], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_store_sk:LongType, sum:DoubleType, count:LongType)
: : : : +- ^(2) HashAggregateTransformer(keys=[ss_store_sk#195L], functions=[partial_avg(_pre_0#267L)], isStreamingAgg=false)
: : : : +- ^(2) ProjectExecTransformer [ss_store_sk#195L, ss_net_profit#209, UnscaledValue(ss_net_profit#209) AS _pre_0#267L]
: : : : +- ^(2) FilterExecTransformer ((isnotnull(ss_store_sk#195L) AND (ss_store_sk#195L = cast(2 as bigint))) AND isnull(ss_hdemo_sk#193L))
: : : : +- ^(2) NativeFileScan parquet spark_catalog.tpcds_pq100.store_sales[ss_hdemo_sk#193L,ss_store_sk#195L,ss_net_profit#209] Batched: true, DataFilters: [isnotnull(ss_store_sk#195L), isnull(ss_hdemo_sk#193L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk), IsNull(ss_hdemo_sk)], ReadSchema: struct<ss_hdemo_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
: : : +- ^(7) ProjectExecTransformer [ss_item_sk#91L AS item_sk#70L, cast((avg(UnscaledValue(ss_net_profit#113))#183 / 100.0) as decimal(11,6)) AS rank_col#71]
: : : +- ^(7) HashAggregateTransformer(keys=[ss_item_sk#91L], functions=[avg(UnscaledValue(ss_net_profit#113))], isStreamingAgg=false)
: : : +- ^(7) InputIteratorTransformer[ss_item_sk#91L, sum#257, count#258L]
: : : +- ColumnarExchange hashpartitioning(ss_item_sk#91L, 5), ENSURE_REQUIREMENTS, [plan_id=1199], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_item_sk:LongType, sum:DoubleType, count:LongType)
: : : +- ^(6) HashAggregateTransformer(keys=[ss_item_sk#91L], functions=[partial_avg(_pre_2#277L)], isStreamingAgg=false)
: : : +- ^(6) ProjectExecTransformer [ss_item_sk#91L, ss_net_profit#113, UnscaledValue(ss_net_profit#113) AS _pre_2#277L]
: : : +- ^(6) FilterExecTransformer (isnotnull(ss_store_sk#99L) AND (ss_store_sk#99L = cast(2 as bigint)))
: : : +- ^(6) NativeFileScan parquet spark_catalog.tpcds_pq100.store_sales[ss_item_sk#91L,ss_store_sk#99L,ss_net_profit#113] Batched: true, DataFilters: [isnotnull(ss_store_sk#99L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(ss_store_sk)], ReadSchema: struct<ss_item_sk:bigint,ss_store_sk:bigint,ss_net_profit:decimal(7,2)>
: : +- ^(12) SortExecTransformer [rnk#79 ASC NULLS FIRST], false, 0
: : +- ^(12) ProjectExecTransformer [item_sk#75L, rnk#79]
: : +- ^(12) FilterExecTransformer ((rnk#79 < 11) AND isnotnull(item_sk#75L))
: : +- ^(12) WindowExecTransformer [rank(rank_col#76) windowspecdefinition(rank_col#76 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rnk#79], [rank_col#76 DESC NULLS LAST]
: : +- ^(12) InputIteratorTransformer[item_sk#75L, rank_col#76]
: : +- RowToCHNativeColumnar
: : +- WindowGroupLimit [rank_col#76 DESC NULLS LAST], rank(rank_col#76), 10, Final
: : +- CHNativeColumnarToRow
: : +- ^(11) SortExecTransformer [rank_col#76 DESC NULLS LAST], false, 0
: : +- ^(11) InputIteratorTransformer[item_sk#75L, rank_col#76]
: : +- ColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=1374], [shuffle_writer_type=hash], [OUTPUT] List(item_sk:LongType, rank_col:DecimalType(11,6))
: : +- RowToCHNativeColumnar
: : +- WindowGroupLimit [rank_col#76 DESC NULLS LAST], rank(rank_col#76), 10, Partial
: : +- CHNativeColumnarToRow
: : +- ^(10) SortExecTransformer [rank_col#76 DESC NULLS LAST], false, 0
: : +- ^(10) FilterExecTransformer (isnotnull(rank_col#76) AND (cast(rank_col#76 as decimal(13,7)) > (0.9 * ReusedSubquery Subquery scalar-subquery#73, [id=#294])))
: : : +- ReusedSubquery Subquery scalar-subquery#73, [id=#294]
: : +- ^(10) ProjectExecTransformer [ss_item_sk#136L AS item_sk#75L, cast((avg(UnscaledValue(ss_net_profit#158))#184 / 100.0) as decimal(11,6)) AS rank_col#76]
: : +- ^(10) HashAggregateTransformer(keys=[ss_item_sk#136L], functions=[avg(UnscaledValue(ss_net_profit#158))], isStreamingAgg=false)
: : +- ^(10) InputIteratorTransformer[ss_item_sk#136L, sum#261, count#262L]
: : +- ReusedExchange [ss_item_sk#136L, sum#261, count#262L], ColumnarExchange hashpartitioning(ss_item_sk#91L, 5), ENSURE_REQUIREMENTS, [plan_id=1199], [shuffle_writer_type=hash], [OUTPUT] ArrayBuffer(ss_item_sk:LongType, sum:DoubleType, count:LongType)
: +- ^(14) SortExecTransformer [i_item_sk#114L ASC NULLS FIRST], false, 0
: +- ^(14) InputIteratorTransformer[i_item_sk#114L, i_product_name#135]
: +- ColumnarExchange hashpartitioning(i_item_sk#114L, 5), ENSURE_REQUIREMENTS, [plan_id=1258], [shuffle_writer_type=hash], [OUTPUT] List(i_item_sk:LongType, i_product_name:StringType)
: +- ^(13) FilterExecTransformer isnotnull(i_item_sk#114L)
: +- ^(13) NativeFileScan parquet spark_catalog.tpcds_pq100.item[i_item_sk#114L,i_product_name#135] Batched: true, DataFilters: [isnotnull(i_item_sk#114L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/data3/liangjiabiao/docker/local_gluten/spark-3.5.1-bin-hadoop3/s..., PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:bigint,i_product_name:string>
+- ^(16) SortExecTransformer [i_item_sk#159L ASC NULLS FIRST], false, 0
+- ^(16) InputIteratorTransformer[i_item_sk#159L, i_product_name#180]
+- ReusedExchange [i_item_sk#159L, i_product_name#180], ColumnarExchange hashpartitioning(i_item_sk#114L, 5), ENSURE_REQUIREMENTS, [plan_id=1258], [shuffle_writer_type=hash], [OUTPUT] List(i_item_sk:LongType, i_product_name:StringType)
q67 and q70 alse fallback on WindowGroupLimit
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request